Skip to main content

helios_persistence/backends/mongodb/
search_impl.rs

1//! Search and conditional-operation implementation for MongoDB backend.
2
3use std::collections::HashSet;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use helios_fhir::FhirVersion;
8use mongodb::{
9    Cursor,
10    bson::{self, Bson, DateTime as BsonDateTime, Document, doc},
11};
12use regex::escape as regex_escape;
13use serde_json::Value;
14
15use crate::core::{
16    ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
17    ConditionalUpdateResult, IncludeProvider, PatchFormat, ResourceStorage, RevincludeProvider,
18    SearchProvider, SearchResult,
19};
20use crate::error::{BackendError, SearchError, StorageError, StorageResult};
21use crate::tenant::TenantContext;
22use crate::types::{
23    CompartmentMembership, CursorDirection, CursorValue, IncludeDirective, IncludeType, Page,
24    PageCursor, PageInfo, SearchModifier, SearchParamType, SearchParameter, SearchPrefix,
25    SearchQuery, SearchValue, StoredResource, strip_reference_version,
26};
27
28use super::MongoBackend;
29
30fn internal_error(message: String) -> StorageError {
31    StorageError::Backend(BackendError::Internal {
32        backend_name: "mongodb".to_string(),
33        message,
34        source: None,
35    })
36}
37
38fn serialization_error(message: String) -> StorageError {
39    StorageError::Backend(BackendError::SerializationError { message })
40}
41
42fn bson_to_chrono(dt: &BsonDateTime) -> DateTime<Utc> {
43    DateTime::<Utc>::from_timestamp_millis(dt.timestamp_millis()).unwrap_or_else(Utc::now)
44}
45
46fn chrono_to_bson(dt: DateTime<Utc>) -> BsonDateTime {
47    BsonDateTime::from_millis(dt.timestamp_millis())
48}
49
50fn parse_date_for_query(value: &str) -> Option<DateTime<Utc>> {
51    let normalized = if value.contains('T') {
52        if value.contains('Z') || value.contains('+') || value.matches('-').count() > 2 {
53            value.to_string()
54        } else {
55            format!("{}+00:00", value)
56        }
57    } else if value.len() == 10 {
58        format!("{}T00:00:00+00:00", value)
59    } else if value.len() == 7 {
60        format!("{}-01T00:00:00+00:00", value)
61    } else if value.len() == 4 {
62        format!("{}-01-01T00:00:00+00:00", value)
63    } else {
64        value.to_string()
65    };
66
67    DateTime::parse_from_rfc3339(&normalized)
68        .ok()
69        .map(|dt| dt.with_timezone(&Utc))
70}
71
72async fn collect_documents(mut cursor: Cursor<Document>) -> StorageResult<Vec<Document>> {
73    let mut docs = Vec::new();
74    while cursor
75        .advance()
76        .await
77        .map_err(|e| internal_error(format!("Failed to advance MongoDB cursor: {}", e)))?
78    {
79        let doc = cursor.deserialize_current().map_err(|e| {
80            internal_error(format!("Failed to deserialize MongoDB document: {}", e))
81        })?;
82        docs.push(doc);
83    }
84    Ok(docs)
85}
86
87/// Finds the `contained[]` entry with the given local `id` in a container's
88/// content.
89fn extract_contained_resource(content: &Value, local_id: &str) -> Option<Value> {
90    content
91        .get("contained")?
92        .as_array()?
93        .iter()
94        .find(|e| e.get("id").and_then(|v| v.as_str()) == Some(local_id))
95        .cloned()
96}
97
98/// Builds a `StoredResource` for a contained resource, inheriting the
99/// container's version/tenant/timestamps. Used for `_containedType=contained`.
100fn build_contained_stored(
101    container: &StoredResource,
102    contained_type: &str,
103    local_id: &str,
104    content: Value,
105) -> StoredResource {
106    StoredResource::from_storage(
107        contained_type.to_string(),
108        local_id.to_string(),
109        container.version_id().to_string(),
110        container.tenant_id().clone(),
111        content,
112        container.created_at(),
113        container.last_modified(),
114        None,
115        container.fhir_version(),
116    )
117}
118
119fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
120    params
121        .split('&')
122        .filter_map(|pair| {
123            let (name, value) = pair.split_once('=')?;
124            Some((name.to_string(), value.to_string()))
125        })
126        .collect()
127}
128
129#[async_trait]
130impl SearchProvider for MongoBackend {
131    async fn search(
132        &self,
133        tenant: &TenantContext,
134        query: &SearchQuery,
135    ) -> StorageResult<SearchResult> {
136        // `_contained` search uses a dedicated path (separate index rows and
137        // heterogeneous result types); standard search handles `_contained=false`
138        // (contained rows carry the container's resource_type, so the standard
139        // type-scoped filter naturally excludes them).
140        if query.contained != crate::types::ContainedMode::Off {
141            return self.search_contained(tenant, query).await;
142        }
143
144        self.validate_query_support(query)?;
145
146        let db = self.get_database().await?;
147        let resources = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
148        let tenant_id = tenant.tenant_id().as_str();
149
150        let cursor = if let Some(cursor_str) = &query.cursor {
151            Some(PageCursor::decode(cursor_str).map_err(|_| {
152                StorageError::Search(SearchError::InvalidCursor {
153                    cursor: cursor_str.clone(),
154                })
155            })?)
156        } else {
157            None
158        };
159
160        if cursor.is_some() && !query.sort.is_empty() {
161            return Err(StorageError::Search(SearchError::QueryParseError {
162                message:
163                    "MongoDB cursor pagination currently supports only default _lastUpdated sort"
164                        .to_string(),
165            }));
166        }
167
168        let previous_mode = cursor
169            .as_ref()
170            .is_some_and(|c| c.direction() == CursorDirection::Previous);
171
172        let matched_ids = self
173            .matching_resource_ids(&db, tenant_id, &query.resource_type, query)
174            .await?;
175
176        let filter = self.build_resource_filter(
177            tenant_id,
178            &query.resource_type,
179            query,
180            matched_ids.as_ref(),
181            cursor.as_ref(),
182        )?;
183
184        let sort = self.build_sort_document(query, previous_mode)?;
185        let page_size = query.count.unwrap_or(100).max(1) as usize;
186
187        let mut find_action = resources
188            .find(filter)
189            .sort(sort)
190            .limit((page_size + 1) as i64);
191
192        if cursor.is_none() {
193            if let Some(offset) = query.offset {
194                find_action = find_action.skip(offset as u64);
195            }
196        }
197
198        let docs = collect_documents(
199            find_action
200                .await
201                .map_err(|e| internal_error(format!("Failed to execute MongoDB search: {}", e)))?,
202        )
203        .await?;
204
205        let mut resources = docs
206            .into_iter()
207            .map(|doc| self.document_to_stored_resource(tenant, &query.resource_type, doc))
208            .collect::<StorageResult<Vec<_>>>()?;
209
210        if previous_mode {
211            resources.reverse();
212        }
213
214        let has_next = resources.len() > page_size;
215        if has_next {
216            let _ = resources.pop();
217        }
218
219        let has_previous = cursor.is_some() || query.offset.unwrap_or(0) > 0;
220
221        let next_cursor = if has_next {
222            resources.last().map(|resource| {
223                PageCursor::new(
224                    vec![CursorValue::String(resource.last_modified().to_rfc3339())],
225                    resource.id(),
226                )
227                .encode()
228            })
229        } else {
230            None
231        };
232
233        let previous_cursor = if has_previous {
234            resources.first().map(|resource| {
235                PageCursor::previous(
236                    vec![CursorValue::String(resource.last_modified().to_rfc3339())],
237                    resource.id(),
238                )
239                .encode()
240            })
241        } else {
242            None
243        };
244
245        let total = if query.total.is_some() {
246            Some(self.search_count(tenant, query).await?)
247        } else {
248            None
249        };
250
251        let page_info = PageInfo {
252            next_cursor,
253            previous_cursor,
254            total,
255            has_next,
256            has_previous,
257        };
258
259        let page = Page::new(resources, page_info);
260        let mut included: Vec<StoredResource> = Vec::new();
261
262        if !query.includes.is_empty() {
263            let forward: Vec<IncludeDirective> = query
264                .includes
265                .iter()
266                .filter(|i| i.include_type == IncludeType::Include)
267                .cloned()
268                .collect();
269            if !forward.is_empty() {
270                let resolved = self.resolve_includes(tenant, &page.items, &forward).await?;
271                Self::merge_unique(&mut included, resolved);
272            }
273
274            let reverse: Vec<IncludeDirective> = query
275                .includes
276                .iter()
277                .filter(|i| i.include_type == IncludeType::Revinclude)
278                .cloned()
279                .collect();
280            if !reverse.is_empty() {
281                let resolved = self
282                    .resolve_revincludes(tenant, &page.items, &reverse)
283                    .await?;
284                Self::merge_unique(&mut included, resolved);
285            }
286        }
287
288        Ok(SearchResult {
289            resources: page,
290            included,
291            total,
292            scores: Default::default(),
293        })
294    }
295
296    async fn search_count(
297        &self,
298        tenant: &TenantContext,
299        query: &SearchQuery,
300    ) -> StorageResult<u64> {
301        self.validate_query_support(query)?;
302
303        let db = self.get_database().await?;
304        let resources = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
305        let tenant_id = tenant.tenant_id().as_str();
306
307        let matched_ids = self
308            .matching_resource_ids(&db, tenant_id, &query.resource_type, query)
309            .await?;
310
311        let filter = self.build_resource_filter(
312            tenant_id,
313            &query.resource_type,
314            query,
315            matched_ids.as_ref(),
316            None,
317        )?;
318
319        resources
320            .count_documents(filter)
321            .await
322            .map_err(|e| internal_error(format!("Failed to count MongoDB search results: {}", e)))
323    }
324
325    fn search_param_registry(
326        &self,
327    ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
328        self.search_registry()
329    }
330
331    fn supports_contained_search(&self) -> bool {
332        true
333    }
334
335    fn modifiers_for_param_type(
336        &self,
337        param_type: crate::types::SearchParamType,
338    ) -> Vec<&'static str> {
339        Self::modifiers_for_type(param_type)
340    }
341}
342
343#[async_trait]
344impl ConditionalStorage for MongoBackend {
345    async fn conditional_create(
346        &self,
347        tenant: &TenantContext,
348        resource_type: &str,
349        resource: Value,
350        search_params: &str,
351        fhir_version: FhirVersion,
352    ) -> StorageResult<ConditionalCreateResult> {
353        let matches = self
354            .find_matching_resources(tenant, resource_type, search_params)
355            .await?;
356
357        match matches.len() {
358            0 => {
359                let created = self
360                    .create(tenant, resource_type, resource, fhir_version)
361                    .await?;
362                Ok(ConditionalCreateResult::Created(created))
363            }
364            1 => Ok(ConditionalCreateResult::Exists(
365                matches.into_iter().next().expect("single match must exist"),
366            )),
367            n => Ok(ConditionalCreateResult::MultipleMatches(n)),
368        }
369    }
370
371    async fn conditional_update(
372        &self,
373        tenant: &TenantContext,
374        resource_type: &str,
375        resource: Value,
376        search_params: &str,
377        upsert: bool,
378        fhir_version: FhirVersion,
379    ) -> StorageResult<ConditionalUpdateResult> {
380        let matches = self
381            .find_matching_resources(tenant, resource_type, search_params)
382            .await?;
383
384        match matches.len() {
385            0 => {
386                if upsert {
387                    let created = self
388                        .create(tenant, resource_type, resource, fhir_version)
389                        .await?;
390                    Ok(ConditionalUpdateResult::Created(created))
391                } else {
392                    Ok(ConditionalUpdateResult::NoMatch)
393                }
394            }
395            1 => {
396                let current = matches.into_iter().next().expect("single match must exist");
397                let updated = self.update(tenant, &current, resource).await?;
398                Ok(ConditionalUpdateResult::Updated(updated))
399            }
400            n => Ok(ConditionalUpdateResult::MultipleMatches(n)),
401        }
402    }
403
404    async fn conditional_delete(
405        &self,
406        tenant: &TenantContext,
407        resource_type: &str,
408        search_params: &str,
409    ) -> StorageResult<ConditionalDeleteResult> {
410        let matches = self
411            .find_matching_resources(tenant, resource_type, search_params)
412            .await?;
413
414        match matches.len() {
415            0 => Ok(ConditionalDeleteResult::NoMatch),
416            1 => {
417                let current = matches.into_iter().next().expect("single match must exist");
418                self.delete(tenant, resource_type, current.id()).await?;
419                Ok(ConditionalDeleteResult::Deleted)
420            }
421            n => Ok(ConditionalDeleteResult::MultipleMatches(n)),
422        }
423    }
424
425    async fn conditional_patch(
426        &self,
427        tenant: &TenantContext,
428        resource_type: &str,
429        search_params: &str,
430        patch: &PatchFormat,
431    ) -> StorageResult<ConditionalPatchResult> {
432        let _ = (tenant, resource_type, search_params, patch);
433        Err(StorageError::Backend(BackendError::UnsupportedCapability {
434            backend_name: "mongodb".to_string(),
435            capability: "conditional_patch".to_string(),
436        }))
437    }
438}
439
440impl MongoBackend {
441    /// Executes a `_contained=true|both` search (see the SQLite backend's
442    /// `search_contained` for shared semantics). Returns containers (default) or
443    /// contained resources (`_containedType=contained`); `both` merges top-level
444    /// matches first. Single window (no keyset cursor).
445    async fn search_contained(
446        &self,
447        tenant: &TenantContext,
448        query: &SearchQuery,
449    ) -> StorageResult<SearchResult> {
450        use crate::types::{ContainedMode, ContainedReturn};
451
452        let db = self.get_database().await?;
453        let tenant_id = tenant.tenant_id().as_str();
454        let contained_type = query.resource_type.as_str();
455
456        let matches = self
457            .matching_contained(&db, tenant_id, contained_type, query)
458            .await?;
459
460        let mut items: Vec<StoredResource> = Vec::new();
461        let mut seen: HashSet<String> = HashSet::new();
462        match query.contained_return {
463            ContainedReturn::Container => {
464                for (ctype, cid, _) in &matches {
465                    if !seen.insert(format!("{ctype}/{cid}")) {
466                        continue;
467                    }
468                    if let Some(container) = self.read(tenant, ctype, cid).await? {
469                        items.push(container);
470                    }
471                }
472            }
473            ContainedReturn::Contained => {
474                for (ctype, cid, local) in &matches {
475                    let Some(local_id) = local else { continue };
476                    if !seen.insert(format!("{ctype}/{cid}#{local_id}")) {
477                        continue;
478                    }
479                    if let Some(container) = self.read(tenant, ctype, cid).await? {
480                        if let Some(c) = extract_contained_resource(container.content(), local_id) {
481                            items.push(build_contained_stored(
482                                &container,
483                                contained_type,
484                                local_id,
485                                c,
486                            ));
487                        }
488                    }
489                }
490            }
491        }
492
493        if query.contained == ContainedMode::Both {
494            let mut top_query = query.clone();
495            top_query.contained = ContainedMode::Off;
496            top_query.contained_return = ContainedReturn::Container;
497            let top = self.search(tenant, &top_query).await?;
498            let mut merged = top.resources.items;
499            let top_urls: HashSet<String> = merged.iter().map(|r| r.url()).collect();
500            for item in items {
501                if !top_urls.contains(&item.url()) {
502                    merged.push(item);
503                }
504            }
505            items = merged;
506        }
507
508        let count = query.count.unwrap_or(100) as usize;
509        let offset = query.offset.unwrap_or(0) as usize;
510        let total = if query.total.is_some() {
511            Some(items.len() as u64)
512        } else {
513            None
514        };
515        let windowed: Vec<StoredResource> = items.into_iter().skip(offset).take(count).collect();
516        let page = Page::new(windowed, PageInfo::end());
517        let mut result = SearchResult::new(page);
518        if let Some(t) = total {
519            result = result.with_total(t);
520        }
521        Ok(result)
522    }
523
524    /// Resolves `_contained` matches via an aggregation over `search_index`,
525    /// grouping contained rows by the contained entity
526    /// `(resource_type, resource_id, contained_local_id)` and requiring every
527    /// searched parameter to be present on that entity. Returns
528    /// `(container_type, container_id, contained_local_id)` tuples.
529    async fn matching_contained(
530        &self,
531        db: &mongodb::Database,
532        tenant_id: &str,
533        contained_type: &str,
534        query: &SearchQuery,
535    ) -> StorageResult<Vec<(String, String, Option<String>)>> {
536        let search_index = db.collection::<Document>(MongoBackend::SEARCH_INDEX_COLLECTION);
537
538        let mut branches: Vec<Bson> = Vec::new();
539        let mut distinct_names: Vec<String> = Vec::new();
540        for param in &query.parameters {
541            if param.name.starts_with('_')
542                || matches!(
543                    param.param_type,
544                    SearchParamType::Composite | SearchParamType::Special
545                )
546            {
547                continue;
548            }
549            // Reuse the standard per-param value filter, dropping the tenant /
550            // resource_type scoping (handled by the pipeline's top `$match`).
551            let mut branch = self.build_search_index_filter("", "", param)?;
552            branch.remove("tenant_id");
553            branch.remove("resource_type");
554            branches.push(Bson::Document(branch));
555            if !distinct_names.contains(&param.name) {
556                distinct_names.push(param.name.clone());
557            }
558        }
559        if branches.is_empty() {
560            return Ok(Vec::new());
561        }
562
563        let mut pipeline = vec![
564            doc! { "$match": {
565                "tenant_id": tenant_id,
566                "is_contained": true,
567                "contained_type": contained_type,
568                "$or": branches,
569            }},
570            doc! { "$group": {
571                "_id": {
572                    "rtype": "$resource_type",
573                    "rid": "$resource_id",
574                    "lid": "$contained_local_id",
575                },
576                "names": { "$addToSet": "$param_name" },
577            }},
578        ];
579        if distinct_names.len() > 1 {
580            pipeline.push(doc! { "$match": { "names": { "$all": distinct_names } } });
581        }
582
583        let cursor = search_index
584            .aggregate(pipeline)
585            .await
586            .map_err(|e| internal_error(format!("Failed to aggregate contained search: {}", e)))?;
587        let docs = collect_documents(cursor).await?;
588
589        let mut out = Vec::new();
590        for doc in docs {
591            if let Ok(id) = doc.get_document("_id") {
592                let rtype = id.get_str("rtype").unwrap_or_default().to_string();
593                let rid = id.get_str("rid").unwrap_or_default().to_string();
594                let lid = id.get_str("lid").ok().map(ToString::to_string);
595                if !rtype.is_empty() && !rid.is_empty() {
596                    out.push((rtype, rid, lid));
597                }
598            }
599        }
600        Ok(out)
601    }
602
603    fn validate_query_support(&self, query: &SearchQuery) -> StorageResult<()> {
604        if query.parameters.iter().any(|param| !param.chain.is_empty()) {
605            return Err(StorageError::Search(
606                SearchError::ChainedSearchNotSupported {
607                    chain: "forward chain".to_string(),
608                },
609            ));
610        }
611
612        if !query.reverse_chains.is_empty() {
613            return Err(StorageError::Search(SearchError::ReverseChainNotSupported));
614        }
615
616        for param in &query.parameters {
617            if matches!(
618                param.modifier,
619                Some(SearchModifier::Above)
620                    | Some(SearchModifier::Below)
621                    | Some(SearchModifier::In)
622                    | Some(SearchModifier::NotIn)
623            ) {
624                return Err(StorageError::Search(SearchError::UnsupportedModifier {
625                    modifier: param
626                        .modifier
627                        .as_ref()
628                        .map(ToString::to_string)
629                        .unwrap_or_default(),
630                    param_type: param.param_type.to_string(),
631                }));
632            }
633        }
634
635        Ok(())
636    }
637
638    async fn matching_resource_ids(
639        &self,
640        db: &mongodb::Database,
641        tenant_id: &str,
642        resource_type: &str,
643        query: &SearchQuery,
644    ) -> StorageResult<Option<HashSet<String>>> {
645        let search_index = db.collection::<Document>(MongoBackend::SEARCH_INDEX_COLLECTION);
646        let mut matched: Option<HashSet<String>> = None;
647
648        for param in &query.parameters {
649            if matches!(param.name.as_str(), "_id" | "_lastUpdated") {
650                continue;
651            }
652
653            let filter = self.build_search_index_filter(tenant_id, resource_type, param)?;
654
655            let ids = search_index
656                .distinct("resource_id", filter)
657                .await
658                .map_err(|e| internal_error(format!("Failed to query search_index: {}", e)))?
659                .into_iter()
660                .filter_map(|value| value.as_str().map(ToString::to_string))
661                .collect::<HashSet<_>>();
662
663            if ids.is_empty() {
664                return Ok(Some(HashSet::new()));
665            }
666
667            matched = Some(match matched {
668                Some(current) => current
669                    .intersection(&ids)
670                    .cloned()
671                    .collect::<HashSet<String>>(),
672                None => ids,
673            });
674
675            if matched.as_ref().is_some_and(|set| set.is_empty()) {
676                return Ok(matched);
677            }
678        }
679
680        // Compartment membership: a resource joins the compartment if it
681        // references the compartment via ANY of the membership params (OR),
682        // per the FHIR CompartmentDefinition. Computed as a single OR query
683        // over the search_index and intersected with the parameter matches.
684        if let Some(comp) = &query.compartment {
685            if let Some(ids) = self
686                .compartment_resource_ids(&search_index, tenant_id, resource_type, comp)
687                .await?
688            {
689                if ids.is_empty() {
690                    return Ok(Some(HashSet::new()));
691                }
692                matched = Some(match matched {
693                    Some(current) => current
694                        .intersection(&ids)
695                        .cloned()
696                        .collect::<HashSet<String>>(),
697                    None => ids,
698                });
699            }
700        }
701
702        Ok(matched)
703    }
704
705    /// Returns the resource IDs that are members of the compartment described by
706    /// `comp`: resources that reference `comp.reference` through ANY of the
707    /// membership params (logical OR). Reference matching is version-agnostic
708    /// (mirrors the reference handler): the stored reference must equal the base
709    /// reference or carry a `/_history/<vid>` suffix.
710    ///
711    /// Returns `Ok(None)` when `comp` carries no params or no reference (no
712    /// restriction to apply), otherwise `Ok(Some(ids))` (possibly empty).
713    async fn compartment_resource_ids(
714        &self,
715        search_index: &mongodb::Collection<Document>,
716        tenant_id: &str,
717        resource_type: &str,
718        comp: &CompartmentMembership,
719    ) -> StorageResult<Option<HashSet<String>>> {
720        if comp.params.is_empty() || comp.reference.is_empty() {
721            return Ok(None);
722        }
723
724        let base = strip_reference_version(&comp.reference);
725        let params: Vec<Bson> = comp.params.iter().cloned().map(Bson::String).collect();
726
727        let filter = doc! {
728            "tenant_id": tenant_id,
729            "resource_type": resource_type,
730            "param_name": { "$in": Bson::Array(params) },
731            "$or": [
732                { "value_reference": &base },
733                { "value_reference": { "$regex": format!("^{}/_history/", regex_escape(base)) } },
734            ],
735        };
736
737        let ids = search_index
738            .distinct("resource_id", filter)
739            .await
740            .map_err(|e| internal_error(format!("Failed to query search_index: {}", e)))?
741            .into_iter()
742            .filter_map(|value| value.as_str().map(ToString::to_string))
743            .collect::<HashSet<_>>();
744
745        Ok(Some(ids))
746    }
747
748    fn build_search_index_filter(
749        &self,
750        tenant_id: &str,
751        resource_type: &str,
752        param: &SearchParameter,
753    ) -> StorageResult<Document> {
754        if param.values.is_empty() {
755            return Err(StorageError::Search(SearchError::QueryParseError {
756                message: format!("Search parameter '{}' has no values", param.name),
757            }));
758        }
759
760        let mut filter = doc! {
761            "tenant_id": tenant_id,
762            "resource_type": resource_type,
763            "param_name": &param.name,
764        };
765
766        let value_filters = param
767            .values
768            .iter()
769            .map(|value| self.build_index_value_filter(param, value))
770            .collect::<StorageResult<Vec<_>>>()?;
771
772        if value_filters.len() == 1 {
773            if let Some(single) = value_filters.into_iter().next() {
774                for (key, value) in single {
775                    filter.insert(key, value);
776                }
777            }
778            return Ok(filter);
779        }
780
781        let combine_with_and = matches!(
782            param.param_type,
783            SearchParamType::Date | SearchParamType::Number
784        );
785        let operator = if combine_with_and { "$and" } else { "$or" };
786        filter.insert(
787            operator,
788            Bson::Array(value_filters.into_iter().map(Bson::Document).collect()),
789        );
790
791        Ok(filter)
792    }
793
794    fn build_index_value_filter(
795        &self,
796        param: &SearchParameter,
797        value: &SearchValue,
798    ) -> StorageResult<Document> {
799        match param.name.as_str() {
800            "_text" | "_content" => {
801                return Err(StorageError::Search(SearchError::TextSearchNotAvailable));
802            }
803            "_id" | "_lastUpdated" => {
804                return Err(StorageError::Search(SearchError::QueryParseError {
805                    message: format!(
806                        "Special parameter '{}' should be resolved against resources, not search_index",
807                        param.name
808                    ),
809                }));
810            }
811            _ => {}
812        }
813
814        match param.param_type {
815            SearchParamType::String => self.build_string_filter(param, value),
816            SearchParamType::Token => self.build_token_filter(param, value),
817            SearchParamType::Date => self.build_date_filter(value, "value_date"),
818            SearchParamType::Number => self.build_number_filter(value),
819            SearchParamType::Reference => self.build_reference_filter(param, value),
820            SearchParamType::Uri => self.build_uri_filter(param, value),
821            SearchParamType::Quantity => self.build_quantity_filter(value),
822            SearchParamType::Composite => {
823                Err(StorageError::Search(SearchError::InvalidComposite {
824                    message: "Composite search is not supported in MongoDB Phase 4".to_string(),
825                }))
826            }
827            SearchParamType::Special => Err(StorageError::Search(
828                SearchError::UnsupportedParameterType {
829                    param_type: format!("special parameter {}", param.name),
830                },
831            )),
832        }
833    }
834
835    fn build_string_filter(
836        &self,
837        param: &SearchParameter,
838        value: &SearchValue,
839    ) -> StorageResult<Document> {
840        if value.prefix != SearchPrefix::Eq {
841            return Err(StorageError::Search(SearchError::QueryParseError {
842                message: format!(
843                    "Unsupported prefix '{}' for string parameter '{}'",
844                    value.prefix, param.name
845                ),
846            }));
847        }
848
849        let lowered = value.value.to_lowercase();
850        match param.modifier.as_ref() {
851            None => Ok(doc! {
852                "value_string": {
853                    "$regex": format!("^{}", regex_escape(&lowered))
854                }
855            }),
856            Some(SearchModifier::Exact) => Ok(doc! { "value_string": lowered }),
857            // `:text` on a string is a case-insensitive partial match,
858            // implemented here as a substring match (same as `:contains`).
859            Some(SearchModifier::Contains | SearchModifier::Text) => Ok(doc! {
860                "value_string": {
861                    "$regex": regex_escape(&lowered)
862                }
863            }),
864            Some(other) => Err(StorageError::Search(SearchError::UnsupportedModifier {
865                modifier: other.to_string(),
866                param_type: "string".to_string(),
867            })),
868        }
869    }
870
871    fn build_token_filter(
872        &self,
873        param: &SearchParameter,
874        value: &SearchValue,
875    ) -> StorageResult<Document> {
876        if value.prefix != SearchPrefix::Eq {
877            return Err(StorageError::Search(SearchError::QueryParseError {
878                message: format!(
879                    "Unsupported prefix '{}' for token parameter '{}'",
880                    value.prefix, param.name
881                ),
882            }));
883        }
884
885        match param.modifier.as_ref() {
886            None => {}
887            // `:text` (contains) and `:code-text` (starts-with) match the
888            // token's display text (Coding.display / CodeableConcept.text).
889            Some(m @ (SearchModifier::Text | SearchModifier::CodeText)) => {
890                let escaped = regex_escape(&value.value);
891                let regex = if *m == SearchModifier::CodeText {
892                    format!("^{}", escaped)
893                } else {
894                    escaped
895                };
896                return Ok(doc! {
897                    "value_token_display": { "$regex": regex, "$options": "i" }
898                });
899            }
900            Some(other) => {
901                return Err(StorageError::Search(SearchError::UnsupportedModifier {
902                    modifier: other.to_string(),
903                    param_type: "token".to_string(),
904                }));
905            }
906        }
907
908        if let Some((system, code)) = value.value.split_once('|') {
909            if system.is_empty() {
910                Ok(doc! { "value_token_code": code })
911            } else if code.is_empty() {
912                Ok(doc! { "value_token_system": system })
913            } else {
914                Ok(doc! {
915                    "value_token_system": system,
916                    "value_token_code": code,
917                })
918            }
919        } else {
920            Ok(doc! { "value_token_code": &value.value })
921        }
922    }
923
924    fn build_reference_filter(
925        &self,
926        param: &SearchParameter,
927        value: &SearchValue,
928    ) -> StorageResult<Document> {
929        if value.prefix != SearchPrefix::Eq {
930            return Err(StorageError::Search(SearchError::QueryParseError {
931                message: format!(
932                    "Unsupported prefix '{}' for reference parameter '{}'",
933                    value.prefix, param.name
934                ),
935            }));
936        }
937
938        // :contains - case-insensitive substring match on the stored reference.
939        if matches!(param.modifier.as_ref(), Some(SearchModifier::Contains)) {
940            return Ok(doc! {
941                "value_reference": {
942                    "$regex": regex_escape(&value.value),
943                    "$options": "i"
944                }
945            });
946        }
947
948        // :text (contains) / :code-text (starts-with) match Reference.display.
949        if matches!(
950            param.modifier.as_ref(),
951            Some(SearchModifier::Text | SearchModifier::CodeText)
952        ) {
953            let escaped = regex_escape(&value.value);
954            let regex = if matches!(param.modifier.as_ref(), Some(SearchModifier::CodeText)) {
955                format!("^{}", escaped)
956            } else {
957                escaped
958            };
959            return Ok(doc! {
960                "value_reference_display": { "$regex": regex, "$options": "i" }
961            });
962        }
963
964        if let Some(modifier) = &param.modifier {
965            return Err(StorageError::Search(SearchError::UnsupportedModifier {
966                modifier: modifier.to_string(),
967                param_type: "reference".to_string(),
968            }));
969        }
970
971        if value.value.contains('/') {
972            return Ok(doc! { "value_reference": &value.value });
973        }
974
975        Ok(doc! {
976            "$or": [
977                { "value_reference": &value.value },
978                {
979                    "value_reference": {
980                        "$regex": format!("/{}$", regex_escape(&value.value))
981                    }
982                }
983            ]
984        })
985    }
986
987    fn build_uri_filter(
988        &self,
989        param: &SearchParameter,
990        value: &SearchValue,
991    ) -> StorageResult<Document> {
992        if value.prefix != SearchPrefix::Eq {
993            return Err(StorageError::Search(SearchError::QueryParseError {
994                message: format!(
995                    "Unsupported prefix '{}' for uri parameter '{}'",
996                    value.prefix, param.name
997                ),
998            }));
999        }
1000
1001        match param.modifier.as_ref() {
1002            None | Some(SearchModifier::Exact) => Ok(doc! { "value_uri": &value.value }),
1003            Some(SearchModifier::Contains) => Ok(doc! {
1004                "value_uri": {
1005                    "$regex": regex_escape(&value.value)
1006                }
1007            }),
1008            Some(other) => Err(StorageError::Search(SearchError::UnsupportedModifier {
1009                modifier: other.to_string(),
1010                param_type: "uri".to_string(),
1011            })),
1012        }
1013    }
1014
1015    fn build_date_filter(&self, value: &SearchValue, field: &str) -> StorageResult<Document> {
1016        let parsed = parse_date_for_query(&value.value).ok_or_else(|| {
1017            StorageError::Search(SearchError::QueryParseError {
1018                message: format!("Invalid date value '{}'", value.value),
1019            })
1020        })?;
1021
1022        let bson_date = chrono_to_bson(parsed);
1023
1024        match value.prefix {
1025            SearchPrefix::Ap => {
1026                let lower = chrono_to_bson(parsed - chrono::Duration::hours(12));
1027                let upper = chrono_to_bson(parsed + chrono::Duration::hours(12));
1028                Ok(doc! {
1029                    field: {
1030                        "$gte": lower,
1031                        "$lte": upper,
1032                    }
1033                })
1034            }
1035            _ => {
1036                let op = Self::prefix_to_mongo_operator(value.prefix)?;
1037                Ok(doc! {
1038                    field: {
1039                        op: bson_date,
1040                    }
1041                })
1042            }
1043        }
1044    }
1045
1046    /// Builds a MongoDB filter for a quantity parameter.
1047    ///
1048    /// Value form: `[prefix]number[|system|code]` (or the `number|code` shorthand).
1049    /// The comparison runs on `value_quantity_value`; an optional system/code
1050    /// further constrain `value_quantity_system` / `value_quantity_unit` (the
1051    /// extractor stores the quantity code under the unit field).
1052    fn build_quantity_filter(&self, value: &SearchValue) -> StorageResult<Document> {
1053        let parts: Vec<&str> = value.value.splitn(3, '|').collect();
1054        let parsed = parts[0].parse::<f64>().map_err(|e| {
1055            StorageError::Search(SearchError::QueryParseError {
1056                message: format!("Invalid quantity value '{}': {}", value.value, e),
1057            })
1058        })?;
1059
1060        let value_condition = match value.prefix {
1061            SearchPrefix::Ap => {
1062                let delta = (parsed.abs() * 0.1).max(0.1);
1063                doc! { "$gte": parsed - delta, "$lte": parsed + delta }
1064            }
1065            _ => {
1066                let op = Self::prefix_to_mongo_operator(value.prefix)?;
1067                doc! { op: parsed }
1068            }
1069        };
1070
1071        let mut filter = doc! { "value_quantity_value": value_condition };
1072        match parts.as_slice() {
1073            // number|system|code
1074            [_, system, code] => {
1075                if !system.is_empty() {
1076                    filter.insert("value_quantity_system", *system);
1077                }
1078                if !code.is_empty() {
1079                    filter.insert("value_quantity_unit", *code);
1080                }
1081            }
1082            // number|code shorthand
1083            [_, code] => {
1084                if !code.is_empty() {
1085                    filter.insert("value_quantity_unit", *code);
1086                }
1087            }
1088            _ => {}
1089        }
1090
1091        Ok(filter)
1092    }
1093
1094    fn build_number_filter(&self, value: &SearchValue) -> StorageResult<Document> {
1095        let parsed = value.value.parse::<f64>().map_err(|e| {
1096            StorageError::Search(SearchError::QueryParseError {
1097                message: format!("Invalid number value '{}': {}", value.value, e),
1098            })
1099        })?;
1100
1101        match value.prefix {
1102            SearchPrefix::Ap => {
1103                let delta = (parsed.abs() * 0.1).max(0.1);
1104                Ok(doc! {
1105                    "value_number": {
1106                        "$gte": parsed - delta,
1107                        "$lte": parsed + delta,
1108                    }
1109                })
1110            }
1111            _ => {
1112                let op = Self::prefix_to_mongo_operator(value.prefix)?;
1113                Ok(doc! {
1114                    "value_number": {
1115                        op: parsed,
1116                    }
1117                })
1118            }
1119        }
1120    }
1121
1122    fn prefix_to_mongo_operator(prefix: SearchPrefix) -> StorageResult<&'static str> {
1123        match prefix {
1124            SearchPrefix::Eq => Ok("$eq"),
1125            SearchPrefix::Ne => Ok("$ne"),
1126            SearchPrefix::Gt | SearchPrefix::Sa => Ok("$gt"),
1127            SearchPrefix::Lt | SearchPrefix::Eb => Ok("$lt"),
1128            SearchPrefix::Ge => Ok("$gte"),
1129            SearchPrefix::Le => Ok("$lte"),
1130            SearchPrefix::Ap => Ok("$eq"),
1131        }
1132    }
1133
1134    fn build_resource_filter(
1135        &self,
1136        tenant_id: &str,
1137        resource_type: &str,
1138        query: &SearchQuery,
1139        matched_ids: Option<&HashSet<String>>,
1140        cursor: Option<&PageCursor>,
1141    ) -> StorageResult<Document> {
1142        let mut conditions = vec![doc! {
1143            "tenant_id": tenant_id,
1144            "resource_type": resource_type,
1145            "is_deleted": false,
1146        }];
1147
1148        if let Some(ids) = matched_ids {
1149            let id_values = ids.iter().cloned().map(Bson::String).collect::<Vec<_>>();
1150            conditions.push(doc! {
1151                "id": { "$in": Bson::Array(id_values) }
1152            });
1153        }
1154
1155        for param in &query.parameters {
1156            match param.name.as_str() {
1157                "_id" => {
1158                    conditions.push(self.build_resource_id_condition(param)?);
1159                }
1160                "_lastUpdated" => {
1161                    conditions.extend(self.build_resource_last_updated_conditions(param)?);
1162                }
1163                _ => {}
1164            }
1165        }
1166
1167        if let Some(cursor) = cursor {
1168            conditions.push(self.build_cursor_condition(cursor)?);
1169        }
1170
1171        if conditions.len() == 1 {
1172            return Ok(conditions.remove(0));
1173        }
1174
1175        Ok(doc! {
1176            "$and": Bson::Array(conditions.into_iter().map(Bson::Document).collect())
1177        })
1178    }
1179
1180    fn build_resource_id_condition(&self, param: &SearchParameter) -> StorageResult<Document> {
1181        let mut ids = Vec::new();
1182
1183        for value in &param.values {
1184            if value.prefix != SearchPrefix::Eq {
1185                return Err(StorageError::Search(SearchError::QueryParseError {
1186                    message: format!("Unsupported prefix '{}' for _id parameter", value.prefix),
1187                }));
1188            }
1189            ids.push(value.value.clone());
1190        }
1191
1192        if ids.len() == 1 {
1193            return Ok(doc! { "id": ids.remove(0) });
1194        }
1195
1196        Ok(doc! {
1197            "id": { "$in": Bson::Array(ids.into_iter().map(Bson::String).collect()) }
1198        })
1199    }
1200
1201    fn build_resource_last_updated_conditions(
1202        &self,
1203        param: &SearchParameter,
1204    ) -> StorageResult<Vec<Document>> {
1205        param
1206            .values
1207            .iter()
1208            .map(|value| self.build_date_filter(value, "last_updated"))
1209            .collect()
1210    }
1211
1212    fn build_cursor_condition(&self, cursor: &PageCursor) -> StorageResult<Document> {
1213        let timestamp = match cursor.sort_values().first() {
1214            Some(CursorValue::String(value)) => DateTime::parse_from_rfc3339(value)
1215                .map_err(|_| {
1216                    StorageError::Search(SearchError::InvalidCursor {
1217                        cursor: cursor.encode(),
1218                    })
1219                })?
1220                .with_timezone(&Utc),
1221            _ => {
1222                return Err(StorageError::Search(SearchError::InvalidCursor {
1223                    cursor: cursor.encode(),
1224                }));
1225            }
1226        };
1227
1228        let ts = chrono_to_bson(timestamp);
1229        let id = cursor.resource_id().to_string();
1230
1231        if cursor.direction() == CursorDirection::Previous {
1232            Ok(doc! {
1233                "$or": [
1234                    { "last_updated": { "$gt": ts } },
1235                    { "last_updated": ts, "id": { "$gt": id } }
1236                ]
1237            })
1238        } else {
1239            Ok(doc! {
1240                "$or": [
1241                    { "last_updated": { "$lt": ts } },
1242                    { "last_updated": ts, "id": { "$lt": id } }
1243                ]
1244            })
1245        }
1246    }
1247
1248    fn build_sort_document(
1249        &self,
1250        query: &SearchQuery,
1251        previous_mode: bool,
1252    ) -> StorageResult<Document> {
1253        if query.sort.is_empty() {
1254            return Ok(if previous_mode {
1255                doc! { "last_updated": 1_i32, "id": 1_i32 }
1256            } else {
1257                doc! { "last_updated": -1_i32, "id": -1_i32 }
1258            });
1259        }
1260
1261        let mut sort = Document::new();
1262
1263        for directive in &query.sort {
1264            let field = match directive.parameter.as_str() {
1265                "_lastUpdated" => "last_updated",
1266                "_id" | "id" => "id",
1267                other => {
1268                    return Err(StorageError::Search(
1269                        SearchError::UnsupportedParameterType {
1270                            param_type: format!("sort parameter '{}'", other),
1271                        },
1272                    ));
1273                }
1274            };
1275
1276            let mut dir = if directive.direction == crate::types::SortDirection::Descending {
1277                -1_i32
1278            } else {
1279                1_i32
1280            };
1281
1282            if previous_mode {
1283                dir = -dir;
1284            }
1285
1286            sort.insert(field, dir);
1287        }
1288
1289        if !sort.contains_key("id") {
1290            sort.insert("id", if previous_mode { 1_i32 } else { -1_i32 });
1291        }
1292
1293        Ok(sort)
1294    }
1295
1296    fn document_to_stored_resource(
1297        &self,
1298        tenant: &TenantContext,
1299        fallback_resource_type: &str,
1300        doc: Document,
1301    ) -> StorageResult<StoredResource> {
1302        let resource_type = doc
1303            .get_str("resource_type")
1304            .ok()
1305            .unwrap_or(fallback_resource_type)
1306            .to_string();
1307
1308        let id = doc
1309            .get_str("id")
1310            .map_err(|e| internal_error(format!("Missing resource id in search result: {}", e)))?
1311            .to_string();
1312
1313        let version_id = doc
1314            .get_str("version_id")
1315            .map_err(|e| internal_error(format!("Missing version_id in search result: {}", e)))?
1316            .to_string();
1317
1318        let payload = doc.get_document("data").map_err(|e| {
1319            internal_error(format!("Missing resource payload in search result: {}", e))
1320        })?;
1321
1322        let content = bson::from_bson::<Value>(Bson::Document(payload.clone())).map_err(|e| {
1323            serialization_error(format!("Failed to deserialize resource payload: {}", e))
1324        })?;
1325
1326        let now = Utc::now();
1327        let created_at = doc
1328            .get_datetime("created_at")
1329            .map(bson_to_chrono)
1330            .unwrap_or(now);
1331
1332        let last_updated = doc
1333            .get_datetime("last_updated")
1334            .map(bson_to_chrono)
1335            .unwrap_or(created_at);
1336
1337        let deleted_at = match doc.get("deleted_at") {
1338            Some(Bson::DateTime(value)) => Some(bson_to_chrono(value)),
1339            _ => None,
1340        };
1341
1342        let fhir_version = doc
1343            .get_str("fhir_version")
1344            .ok()
1345            .and_then(FhirVersion::from_storage)
1346            .unwrap_or_else(FhirVersion::default_enabled);
1347
1348        Ok(StoredResource::from_storage(
1349            resource_type,
1350            id,
1351            version_id,
1352            tenant.tenant_id().clone(),
1353            content,
1354            created_at,
1355            last_updated,
1356            deleted_at,
1357            fhir_version,
1358        ))
1359    }
1360
1361    async fn find_matching_resources(
1362        &self,
1363        tenant: &TenantContext,
1364        resource_type: &str,
1365        search_params_str: &str,
1366    ) -> StorageResult<Vec<StoredResource>> {
1367        let parsed_params = parse_simple_search_params(search_params_str);
1368
1369        if parsed_params.is_empty() {
1370            return Ok(Vec::new());
1371        }
1372
1373        let search_params = self.build_search_parameters(resource_type, &parsed_params);
1374
1375        let query = SearchQuery {
1376            resource_type: resource_type.to_string(),
1377            parameters: search_params,
1378            count: Some(1000),
1379            ..Default::default()
1380        };
1381
1382        let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
1383        Ok(result.resources.items)
1384    }
1385
1386    fn build_search_parameters(
1387        &self,
1388        resource_type: &str,
1389        params: &[(String, String)],
1390    ) -> Vec<SearchParameter> {
1391        let registry = self.search_registry().read();
1392
1393        params
1394            .iter()
1395            .map(|(name, value)| {
1396                let values = vec![SearchValue::parse(value)];
1397                let param_type =
1398                    crate::search::resolve_param_type(&registry, resource_type, name, &values);
1399
1400                SearchParameter {
1401                    name: name.clone(),
1402                    param_type,
1403                    modifier: None,
1404                    values,
1405                    chain: vec![],
1406                    components: vec![],
1407                }
1408            })
1409            .collect()
1410    }
1411
1412    fn merge_unique(target: &mut Vec<StoredResource>, additions: Vec<StoredResource>) {
1413        let mut seen: HashSet<String> = target
1414            .iter()
1415            .map(|r| format!("{}/{}", r.resource_type(), r.id()))
1416            .collect();
1417        for resource in additions {
1418            let key = format!("{}/{}", resource.resource_type(), resource.id());
1419            if seen.insert(key) {
1420                target.push(resource);
1421            }
1422        }
1423    }
1424
1425    fn extract_references(content: &Value, search_param: &str) -> Vec<String> {
1426        let mut refs = Vec::new();
1427        if let Some(value) = content.get(search_param) {
1428            Self::collect_references_from_value(value, &mut refs);
1429        }
1430        refs
1431    }
1432
1433    fn collect_references_from_value(value: &Value, refs: &mut Vec<String>) {
1434        match value {
1435            Value::Object(obj) => {
1436                if let Some(Value::String(reference)) = obj.get("reference") {
1437                    refs.push(reference.clone());
1438                }
1439                for v in obj.values() {
1440                    Self::collect_references_from_value(v, refs);
1441                }
1442            }
1443            Value::Array(arr) => {
1444                for item in arr {
1445                    Self::collect_references_from_value(item, refs);
1446                }
1447            }
1448            _ => {}
1449        }
1450    }
1451
1452    fn parse_reference(reference: &str) -> Option<(String, String)> {
1453        let trimmed = reference
1454            .strip_prefix("http://")
1455            .or_else(|| reference.strip_prefix("https://"))
1456            .unwrap_or(reference);
1457
1458        let mut segments: Vec<&str> = trimmed.split('/').filter(|s| !s.is_empty()).collect();
1459        if segments.len() < 2 {
1460            return None;
1461        }
1462
1463        let id = segments.pop()?.to_string();
1464        let resource_type = segments.pop()?.to_string();
1465
1466        if !resource_type
1467            .chars()
1468            .next()
1469            .is_some_and(|c| c.is_ascii_uppercase())
1470        {
1471            return None;
1472        }
1473
1474        Some((resource_type, id))
1475    }
1476
1477    async fn fetch_resource_by_type_id(
1478        &self,
1479        tenant: &TenantContext,
1480        resource_type: &str,
1481        id: &str,
1482    ) -> StorageResult<Option<StoredResource>> {
1483        let db = self.get_database().await?;
1484        let resources = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
1485        let tenant_id = tenant.tenant_id().as_str();
1486
1487        let doc = resources
1488            .find_one(doc! {
1489                "tenant_id": tenant_id,
1490                "resource_type": resource_type,
1491                "id": id,
1492                "is_deleted": false,
1493            })
1494            .await
1495            .map_err(|e| internal_error(format!("Failed to fetch included resource: {}", e)))?;
1496
1497        match doc {
1498            Some(doc) => Ok(Some(self.document_to_stored_resource(
1499                tenant,
1500                resource_type,
1501                doc,
1502            )?)),
1503            None => Ok(None),
1504        }
1505    }
1506}
1507
1508#[async_trait]
1509impl IncludeProvider for MongoBackend {
1510    async fn resolve_includes(
1511        &self,
1512        tenant: &TenantContext,
1513        resources: &[StoredResource],
1514        includes: &[IncludeDirective],
1515    ) -> StorageResult<Vec<StoredResource>> {
1516        if resources.is_empty() || includes.is_empty() {
1517            return Ok(Vec::new());
1518        }
1519
1520        let mut included = Vec::new();
1521        let mut seen: HashSet<String> = HashSet::new();
1522
1523        for include in includes {
1524            for resource in resources {
1525                if resource.resource_type() != include.source_type {
1526                    continue;
1527                }
1528
1529                let refs = Self::extract_references(resource.content(), &include.search_param);
1530                for reference in refs {
1531                    let Some((ref_type, ref_id)) = Self::parse_reference(&reference) else {
1532                        continue;
1533                    };
1534
1535                    if let Some(target) = include.target_type.as_ref() {
1536                        if ref_type != *target {
1537                            continue;
1538                        }
1539                    }
1540
1541                    let key = format!("{}/{}", ref_type, ref_id);
1542                    if !seen.insert(key) {
1543                        continue;
1544                    }
1545
1546                    if let Some(stored) = self
1547                        .fetch_resource_by_type_id(tenant, &ref_type, &ref_id)
1548                        .await?
1549                    {
1550                        included.push(stored);
1551                    }
1552                }
1553            }
1554        }
1555
1556        Ok(included)
1557    }
1558}
1559
1560#[async_trait]
1561impl RevincludeProvider for MongoBackend {
1562    async fn resolve_revincludes(
1563        &self,
1564        tenant: &TenantContext,
1565        resources: &[StoredResource],
1566        revincludes: &[IncludeDirective],
1567    ) -> StorageResult<Vec<StoredResource>> {
1568        if resources.is_empty() || revincludes.is_empty() {
1569            return Ok(Vec::new());
1570        }
1571
1572        let db = self.get_database().await?;
1573        let tenant_id = tenant.tenant_id().as_str();
1574        let search_index = db.collection::<Document>(MongoBackend::SEARCH_INDEX_COLLECTION);
1575        let resources_collection = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
1576
1577        let mut included = Vec::new();
1578        let mut seen: HashSet<String> = HashSet::new();
1579
1580        for revinclude in revincludes {
1581            if revinclude.source_type.is_empty() {
1582                continue;
1583            }
1584
1585            let mut reference_values: Vec<String> = Vec::with_capacity(resources.len() * 2);
1586            for resource in resources {
1587                reference_values.push(format!("{}/{}", resource.resource_type(), resource.id()));
1588                reference_values.push(resource.id().to_string());
1589            }
1590            reference_values.sort();
1591            reference_values.dedup();
1592
1593            if reference_values.is_empty() {
1594                continue;
1595            }
1596
1597            let bson_values: Vec<Bson> = reference_values.into_iter().map(Bson::String).collect();
1598            let index_filter = doc! {
1599                "tenant_id": tenant_id,
1600                "resource_type": &revinclude.source_type,
1601                "param_name": &revinclude.search_param,
1602                "value_reference": { "$in": Bson::Array(bson_values) },
1603            };
1604
1605            let matching_ids: Vec<String> = search_index
1606                .distinct("resource_id", index_filter)
1607                .await
1608                .map_err(|e| {
1609                    internal_error(format!(
1610                        "Failed to query search_index for revinclude: {}",
1611                        e
1612                    ))
1613                })?
1614                .into_iter()
1615                .filter_map(|value| value.as_str().map(ToString::to_string))
1616                .collect();
1617
1618            if matching_ids.is_empty() {
1619                continue;
1620            }
1621
1622            let id_bson: Vec<Bson> = matching_ids.into_iter().map(Bson::String).collect();
1623            let resource_filter = doc! {
1624                "tenant_id": tenant_id,
1625                "resource_type": &revinclude.source_type,
1626                "is_deleted": false,
1627                "id": { "$in": Bson::Array(id_bson) },
1628            };
1629
1630            let docs =
1631                collect_documents(resources_collection.find(resource_filter).await.map_err(
1632                    |e| internal_error(format!("Failed to fetch revinclude resources: {}", e)),
1633                )?)
1634                .await?;
1635
1636            for doc in docs {
1637                let stored =
1638                    self.document_to_stored_resource(tenant, &revinclude.source_type, doc)?;
1639                let key = format!("{}/{}", stored.resource_type(), stored.id());
1640                if seen.insert(key) {
1641                    included.push(stored);
1642                }
1643            }
1644        }
1645
1646        Ok(included)
1647    }
1648}