1use std::collections::HashSet;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use helios_fhir::FhirVersion;
8use mongodb::{
9 Cursor,
10 bson::{self, Bson, DateTime as BsonDateTime, Document, doc},
11};
12use regex::escape as regex_escape;
13use serde_json::Value;
14
15use crate::core::{
16 ConditionalCreateResult, ConditionalDeleteResult, ConditionalPatchResult, ConditionalStorage,
17 ConditionalUpdateResult, IncludeProvider, PatchFormat, ResourceStorage, RevincludeProvider,
18 SearchProvider, SearchResult,
19};
20use crate::error::{BackendError, SearchError, StorageError, StorageResult};
21use crate::tenant::TenantContext;
22use crate::types::{
23 CompartmentMembership, CursorDirection, CursorValue, IncludeDirective, IncludeType, Page,
24 PageCursor, PageInfo, SearchModifier, SearchParamType, SearchParameter, SearchPrefix,
25 SearchQuery, SearchValue, StoredResource, strip_reference_version,
26};
27
28use super::MongoBackend;
29
30fn internal_error(message: String) -> StorageError {
31 StorageError::Backend(BackendError::Internal {
32 backend_name: "mongodb".to_string(),
33 message,
34 source: None,
35 })
36}
37
38fn serialization_error(message: String) -> StorageError {
39 StorageError::Backend(BackendError::SerializationError { message })
40}
41
42fn bson_to_chrono(dt: &BsonDateTime) -> DateTime<Utc> {
43 DateTime::<Utc>::from_timestamp_millis(dt.timestamp_millis()).unwrap_or_else(Utc::now)
44}
45
46fn chrono_to_bson(dt: DateTime<Utc>) -> BsonDateTime {
47 BsonDateTime::from_millis(dt.timestamp_millis())
48}
49
50fn parse_date_for_query(value: &str) -> Option<DateTime<Utc>> {
51 let normalized = if value.contains('T') {
52 if value.contains('Z') || value.contains('+') || value.matches('-').count() > 2 {
53 value.to_string()
54 } else {
55 format!("{}+00:00", value)
56 }
57 } else if value.len() == 10 {
58 format!("{}T00:00:00+00:00", value)
59 } else if value.len() == 7 {
60 format!("{}-01T00:00:00+00:00", value)
61 } else if value.len() == 4 {
62 format!("{}-01-01T00:00:00+00:00", value)
63 } else {
64 value.to_string()
65 };
66
67 DateTime::parse_from_rfc3339(&normalized)
68 .ok()
69 .map(|dt| dt.with_timezone(&Utc))
70}
71
72async fn collect_documents(mut cursor: Cursor<Document>) -> StorageResult<Vec<Document>> {
73 let mut docs = Vec::new();
74 while cursor
75 .advance()
76 .await
77 .map_err(|e| internal_error(format!("Failed to advance MongoDB cursor: {}", e)))?
78 {
79 let doc = cursor.deserialize_current().map_err(|e| {
80 internal_error(format!("Failed to deserialize MongoDB document: {}", e))
81 })?;
82 docs.push(doc);
83 }
84 Ok(docs)
85}
86
87fn extract_contained_resource(content: &Value, local_id: &str) -> Option<Value> {
90 content
91 .get("contained")?
92 .as_array()?
93 .iter()
94 .find(|e| e.get("id").and_then(|v| v.as_str()) == Some(local_id))
95 .cloned()
96}
97
98fn build_contained_stored(
101 container: &StoredResource,
102 contained_type: &str,
103 local_id: &str,
104 content: Value,
105) -> StoredResource {
106 StoredResource::from_storage(
107 contained_type.to_string(),
108 local_id.to_string(),
109 container.version_id().to_string(),
110 container.tenant_id().clone(),
111 content,
112 container.created_at(),
113 container.last_modified(),
114 None,
115 container.fhir_version(),
116 )
117}
118
119fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
120 params
121 .split('&')
122 .filter_map(|pair| {
123 let (name, value) = pair.split_once('=')?;
124 Some((name.to_string(), value.to_string()))
125 })
126 .collect()
127}
128
129#[async_trait]
130impl SearchProvider for MongoBackend {
131 async fn search(
132 &self,
133 tenant: &TenantContext,
134 query: &SearchQuery,
135 ) -> StorageResult<SearchResult> {
136 if query.contained != crate::types::ContainedMode::Off {
141 return self.search_contained(tenant, query).await;
142 }
143
144 self.validate_query_support(query)?;
145
146 let db = self.get_database().await?;
147 let resources = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
148 let tenant_id = tenant.tenant_id().as_str();
149
150 let cursor = if let Some(cursor_str) = &query.cursor {
151 Some(PageCursor::decode(cursor_str).map_err(|_| {
152 StorageError::Search(SearchError::InvalidCursor {
153 cursor: cursor_str.clone(),
154 })
155 })?)
156 } else {
157 None
158 };
159
160 if cursor.is_some() && !query.sort.is_empty() {
161 return Err(StorageError::Search(SearchError::QueryParseError {
162 message:
163 "MongoDB cursor pagination currently supports only default _lastUpdated sort"
164 .to_string(),
165 }));
166 }
167
168 let previous_mode = cursor
169 .as_ref()
170 .is_some_and(|c| c.direction() == CursorDirection::Previous);
171
172 let matched_ids = self
173 .matching_resource_ids(&db, tenant_id, &query.resource_type, query)
174 .await?;
175
176 let filter = self.build_resource_filter(
177 tenant_id,
178 &query.resource_type,
179 query,
180 matched_ids.as_ref(),
181 cursor.as_ref(),
182 )?;
183
184 let sort = self.build_sort_document(query, previous_mode)?;
185 let page_size = query.count.unwrap_or(100).max(1) as usize;
186
187 let mut find_action = resources
188 .find(filter)
189 .sort(sort)
190 .limit((page_size + 1) as i64);
191
192 if cursor.is_none() {
193 if let Some(offset) = query.offset {
194 find_action = find_action.skip(offset as u64);
195 }
196 }
197
198 let docs = collect_documents(
199 find_action
200 .await
201 .map_err(|e| internal_error(format!("Failed to execute MongoDB search: {}", e)))?,
202 )
203 .await?;
204
205 let mut resources = docs
206 .into_iter()
207 .map(|doc| self.document_to_stored_resource(tenant, &query.resource_type, doc))
208 .collect::<StorageResult<Vec<_>>>()?;
209
210 if previous_mode {
211 resources.reverse();
212 }
213
214 let has_next = resources.len() > page_size;
215 if has_next {
216 let _ = resources.pop();
217 }
218
219 let has_previous = cursor.is_some() || query.offset.unwrap_or(0) > 0;
220
221 let next_cursor = if has_next {
222 resources.last().map(|resource| {
223 PageCursor::new(
224 vec![CursorValue::String(resource.last_modified().to_rfc3339())],
225 resource.id(),
226 )
227 .encode()
228 })
229 } else {
230 None
231 };
232
233 let previous_cursor = if has_previous {
234 resources.first().map(|resource| {
235 PageCursor::previous(
236 vec![CursorValue::String(resource.last_modified().to_rfc3339())],
237 resource.id(),
238 )
239 .encode()
240 })
241 } else {
242 None
243 };
244
245 let total = if query.total.is_some() {
246 Some(self.search_count(tenant, query).await?)
247 } else {
248 None
249 };
250
251 let page_info = PageInfo {
252 next_cursor,
253 previous_cursor,
254 total,
255 has_next,
256 has_previous,
257 };
258
259 let page = Page::new(resources, page_info);
260 let mut included: Vec<StoredResource> = Vec::new();
261
262 if !query.includes.is_empty() {
263 let forward: Vec<IncludeDirective> = query
264 .includes
265 .iter()
266 .filter(|i| i.include_type == IncludeType::Include)
267 .cloned()
268 .collect();
269 if !forward.is_empty() {
270 let resolved = self.resolve_includes(tenant, &page.items, &forward).await?;
271 Self::merge_unique(&mut included, resolved);
272 }
273
274 let reverse: Vec<IncludeDirective> = query
275 .includes
276 .iter()
277 .filter(|i| i.include_type == IncludeType::Revinclude)
278 .cloned()
279 .collect();
280 if !reverse.is_empty() {
281 let resolved = self
282 .resolve_revincludes(tenant, &page.items, &reverse)
283 .await?;
284 Self::merge_unique(&mut included, resolved);
285 }
286 }
287
288 Ok(SearchResult {
289 resources: page,
290 included,
291 total,
292 scores: Default::default(),
293 })
294 }
295
296 async fn search_count(
297 &self,
298 tenant: &TenantContext,
299 query: &SearchQuery,
300 ) -> StorageResult<u64> {
301 self.validate_query_support(query)?;
302
303 let db = self.get_database().await?;
304 let resources = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
305 let tenant_id = tenant.tenant_id().as_str();
306
307 let matched_ids = self
308 .matching_resource_ids(&db, tenant_id, &query.resource_type, query)
309 .await?;
310
311 let filter = self.build_resource_filter(
312 tenant_id,
313 &query.resource_type,
314 query,
315 matched_ids.as_ref(),
316 None,
317 )?;
318
319 resources
320 .count_documents(filter)
321 .await
322 .map_err(|e| internal_error(format!("Failed to count MongoDB search results: {}", e)))
323 }
324
325 fn search_param_registry(
326 &self,
327 ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
328 self.search_registry()
329 }
330
331 fn supports_contained_search(&self) -> bool {
332 true
333 }
334
335 fn modifiers_for_param_type(
336 &self,
337 param_type: crate::types::SearchParamType,
338 ) -> Vec<&'static str> {
339 Self::modifiers_for_type(param_type)
340 }
341}
342
343#[async_trait]
344impl ConditionalStorage for MongoBackend {
345 async fn conditional_create(
346 &self,
347 tenant: &TenantContext,
348 resource_type: &str,
349 resource: Value,
350 search_params: &str,
351 fhir_version: FhirVersion,
352 ) -> StorageResult<ConditionalCreateResult> {
353 let matches = self
354 .find_matching_resources(tenant, resource_type, search_params)
355 .await?;
356
357 match matches.len() {
358 0 => {
359 let created = self
360 .create(tenant, resource_type, resource, fhir_version)
361 .await?;
362 Ok(ConditionalCreateResult::Created(created))
363 }
364 1 => Ok(ConditionalCreateResult::Exists(
365 matches.into_iter().next().expect("single match must exist"),
366 )),
367 n => Ok(ConditionalCreateResult::MultipleMatches(n)),
368 }
369 }
370
371 async fn conditional_update(
372 &self,
373 tenant: &TenantContext,
374 resource_type: &str,
375 resource: Value,
376 search_params: &str,
377 upsert: bool,
378 fhir_version: FhirVersion,
379 ) -> StorageResult<ConditionalUpdateResult> {
380 let matches = self
381 .find_matching_resources(tenant, resource_type, search_params)
382 .await?;
383
384 match matches.len() {
385 0 => {
386 if upsert {
387 let created = self
388 .create(tenant, resource_type, resource, fhir_version)
389 .await?;
390 Ok(ConditionalUpdateResult::Created(created))
391 } else {
392 Ok(ConditionalUpdateResult::NoMatch)
393 }
394 }
395 1 => {
396 let current = matches.into_iter().next().expect("single match must exist");
397 let updated = self.update(tenant, ¤t, resource).await?;
398 Ok(ConditionalUpdateResult::Updated(updated))
399 }
400 n => Ok(ConditionalUpdateResult::MultipleMatches(n)),
401 }
402 }
403
404 async fn conditional_delete(
405 &self,
406 tenant: &TenantContext,
407 resource_type: &str,
408 search_params: &str,
409 ) -> StorageResult<ConditionalDeleteResult> {
410 let matches = self
411 .find_matching_resources(tenant, resource_type, search_params)
412 .await?;
413
414 match matches.len() {
415 0 => Ok(ConditionalDeleteResult::NoMatch),
416 1 => {
417 let current = matches.into_iter().next().expect("single match must exist");
418 self.delete(tenant, resource_type, current.id()).await?;
419 Ok(ConditionalDeleteResult::Deleted)
420 }
421 n => Ok(ConditionalDeleteResult::MultipleMatches(n)),
422 }
423 }
424
425 async fn conditional_patch(
426 &self,
427 tenant: &TenantContext,
428 resource_type: &str,
429 search_params: &str,
430 patch: &PatchFormat,
431 ) -> StorageResult<ConditionalPatchResult> {
432 let _ = (tenant, resource_type, search_params, patch);
433 Err(StorageError::Backend(BackendError::UnsupportedCapability {
434 backend_name: "mongodb".to_string(),
435 capability: "conditional_patch".to_string(),
436 }))
437 }
438}
439
440impl MongoBackend {
441 async fn search_contained(
446 &self,
447 tenant: &TenantContext,
448 query: &SearchQuery,
449 ) -> StorageResult<SearchResult> {
450 use crate::types::{ContainedMode, ContainedReturn};
451
452 let db = self.get_database().await?;
453 let tenant_id = tenant.tenant_id().as_str();
454 let contained_type = query.resource_type.as_str();
455
456 let matches = self
457 .matching_contained(&db, tenant_id, contained_type, query)
458 .await?;
459
460 let mut items: Vec<StoredResource> = Vec::new();
461 let mut seen: HashSet<String> = HashSet::new();
462 match query.contained_return {
463 ContainedReturn::Container => {
464 for (ctype, cid, _) in &matches {
465 if !seen.insert(format!("{ctype}/{cid}")) {
466 continue;
467 }
468 if let Some(container) = self.read(tenant, ctype, cid).await? {
469 items.push(container);
470 }
471 }
472 }
473 ContainedReturn::Contained => {
474 for (ctype, cid, local) in &matches {
475 let Some(local_id) = local else { continue };
476 if !seen.insert(format!("{ctype}/{cid}#{local_id}")) {
477 continue;
478 }
479 if let Some(container) = self.read(tenant, ctype, cid).await? {
480 if let Some(c) = extract_contained_resource(container.content(), local_id) {
481 items.push(build_contained_stored(
482 &container,
483 contained_type,
484 local_id,
485 c,
486 ));
487 }
488 }
489 }
490 }
491 }
492
493 if query.contained == ContainedMode::Both {
494 let mut top_query = query.clone();
495 top_query.contained = ContainedMode::Off;
496 top_query.contained_return = ContainedReturn::Container;
497 let top = self.search(tenant, &top_query).await?;
498 let mut merged = top.resources.items;
499 let top_urls: HashSet<String> = merged.iter().map(|r| r.url()).collect();
500 for item in items {
501 if !top_urls.contains(&item.url()) {
502 merged.push(item);
503 }
504 }
505 items = merged;
506 }
507
508 let count = query.count.unwrap_or(100) as usize;
509 let offset = query.offset.unwrap_or(0) as usize;
510 let total = if query.total.is_some() {
511 Some(items.len() as u64)
512 } else {
513 None
514 };
515 let windowed: Vec<StoredResource> = items.into_iter().skip(offset).take(count).collect();
516 let page = Page::new(windowed, PageInfo::end());
517 let mut result = SearchResult::new(page);
518 if let Some(t) = total {
519 result = result.with_total(t);
520 }
521 Ok(result)
522 }
523
524 async fn matching_contained(
530 &self,
531 db: &mongodb::Database,
532 tenant_id: &str,
533 contained_type: &str,
534 query: &SearchQuery,
535 ) -> StorageResult<Vec<(String, String, Option<String>)>> {
536 let search_index = db.collection::<Document>(MongoBackend::SEARCH_INDEX_COLLECTION);
537
538 let mut branches: Vec<Bson> = Vec::new();
539 let mut distinct_names: Vec<String> = Vec::new();
540 for param in &query.parameters {
541 if param.name.starts_with('_')
542 || matches!(
543 param.param_type,
544 SearchParamType::Composite | SearchParamType::Special
545 )
546 {
547 continue;
548 }
549 let mut branch = self.build_search_index_filter("", "", param)?;
552 branch.remove("tenant_id");
553 branch.remove("resource_type");
554 branches.push(Bson::Document(branch));
555 if !distinct_names.contains(¶m.name) {
556 distinct_names.push(param.name.clone());
557 }
558 }
559 if branches.is_empty() {
560 return Ok(Vec::new());
561 }
562
563 let mut pipeline = vec![
564 doc! { "$match": {
565 "tenant_id": tenant_id,
566 "is_contained": true,
567 "contained_type": contained_type,
568 "$or": branches,
569 }},
570 doc! { "$group": {
571 "_id": {
572 "rtype": "$resource_type",
573 "rid": "$resource_id",
574 "lid": "$contained_local_id",
575 },
576 "names": { "$addToSet": "$param_name" },
577 }},
578 ];
579 if distinct_names.len() > 1 {
580 pipeline.push(doc! { "$match": { "names": { "$all": distinct_names } } });
581 }
582
583 let cursor = search_index
584 .aggregate(pipeline)
585 .await
586 .map_err(|e| internal_error(format!("Failed to aggregate contained search: {}", e)))?;
587 let docs = collect_documents(cursor).await?;
588
589 let mut out = Vec::new();
590 for doc in docs {
591 if let Ok(id) = doc.get_document("_id") {
592 let rtype = id.get_str("rtype").unwrap_or_default().to_string();
593 let rid = id.get_str("rid").unwrap_or_default().to_string();
594 let lid = id.get_str("lid").ok().map(ToString::to_string);
595 if !rtype.is_empty() && !rid.is_empty() {
596 out.push((rtype, rid, lid));
597 }
598 }
599 }
600 Ok(out)
601 }
602
603 fn validate_query_support(&self, query: &SearchQuery) -> StorageResult<()> {
604 if query.parameters.iter().any(|param| !param.chain.is_empty()) {
605 return Err(StorageError::Search(
606 SearchError::ChainedSearchNotSupported {
607 chain: "forward chain".to_string(),
608 },
609 ));
610 }
611
612 if !query.reverse_chains.is_empty() {
613 return Err(StorageError::Search(SearchError::ReverseChainNotSupported));
614 }
615
616 for param in &query.parameters {
617 if matches!(
618 param.modifier,
619 Some(SearchModifier::Above)
620 | Some(SearchModifier::Below)
621 | Some(SearchModifier::In)
622 | Some(SearchModifier::NotIn)
623 ) {
624 return Err(StorageError::Search(SearchError::UnsupportedModifier {
625 modifier: param
626 .modifier
627 .as_ref()
628 .map(ToString::to_string)
629 .unwrap_or_default(),
630 param_type: param.param_type.to_string(),
631 }));
632 }
633 }
634
635 Ok(())
636 }
637
638 async fn matching_resource_ids(
639 &self,
640 db: &mongodb::Database,
641 tenant_id: &str,
642 resource_type: &str,
643 query: &SearchQuery,
644 ) -> StorageResult<Option<HashSet<String>>> {
645 let search_index = db.collection::<Document>(MongoBackend::SEARCH_INDEX_COLLECTION);
646 let mut matched: Option<HashSet<String>> = None;
647
648 for param in &query.parameters {
649 if matches!(param.name.as_str(), "_id" | "_lastUpdated") {
650 continue;
651 }
652
653 let filter = self.build_search_index_filter(tenant_id, resource_type, param)?;
654
655 let ids = search_index
656 .distinct("resource_id", filter)
657 .await
658 .map_err(|e| internal_error(format!("Failed to query search_index: {}", e)))?
659 .into_iter()
660 .filter_map(|value| value.as_str().map(ToString::to_string))
661 .collect::<HashSet<_>>();
662
663 if ids.is_empty() {
664 return Ok(Some(HashSet::new()));
665 }
666
667 matched = Some(match matched {
668 Some(current) => current
669 .intersection(&ids)
670 .cloned()
671 .collect::<HashSet<String>>(),
672 None => ids,
673 });
674
675 if matched.as_ref().is_some_and(|set| set.is_empty()) {
676 return Ok(matched);
677 }
678 }
679
680 if let Some(comp) = &query.compartment {
685 if let Some(ids) = self
686 .compartment_resource_ids(&search_index, tenant_id, resource_type, comp)
687 .await?
688 {
689 if ids.is_empty() {
690 return Ok(Some(HashSet::new()));
691 }
692 matched = Some(match matched {
693 Some(current) => current
694 .intersection(&ids)
695 .cloned()
696 .collect::<HashSet<String>>(),
697 None => ids,
698 });
699 }
700 }
701
702 Ok(matched)
703 }
704
705 async fn compartment_resource_ids(
714 &self,
715 search_index: &mongodb::Collection<Document>,
716 tenant_id: &str,
717 resource_type: &str,
718 comp: &CompartmentMembership,
719 ) -> StorageResult<Option<HashSet<String>>> {
720 if comp.params.is_empty() || comp.reference.is_empty() {
721 return Ok(None);
722 }
723
724 let base = strip_reference_version(&comp.reference);
725 let params: Vec<Bson> = comp.params.iter().cloned().map(Bson::String).collect();
726
727 let filter = doc! {
728 "tenant_id": tenant_id,
729 "resource_type": resource_type,
730 "param_name": { "$in": Bson::Array(params) },
731 "$or": [
732 { "value_reference": &base },
733 { "value_reference": { "$regex": format!("^{}/_history/", regex_escape(base)) } },
734 ],
735 };
736
737 let ids = search_index
738 .distinct("resource_id", filter)
739 .await
740 .map_err(|e| internal_error(format!("Failed to query search_index: {}", e)))?
741 .into_iter()
742 .filter_map(|value| value.as_str().map(ToString::to_string))
743 .collect::<HashSet<_>>();
744
745 Ok(Some(ids))
746 }
747
748 fn build_search_index_filter(
749 &self,
750 tenant_id: &str,
751 resource_type: &str,
752 param: &SearchParameter,
753 ) -> StorageResult<Document> {
754 if param.values.is_empty() {
755 return Err(StorageError::Search(SearchError::QueryParseError {
756 message: format!("Search parameter '{}' has no values", param.name),
757 }));
758 }
759
760 let mut filter = doc! {
761 "tenant_id": tenant_id,
762 "resource_type": resource_type,
763 "param_name": ¶m.name,
764 };
765
766 let value_filters = param
767 .values
768 .iter()
769 .map(|value| self.build_index_value_filter(param, value))
770 .collect::<StorageResult<Vec<_>>>()?;
771
772 if value_filters.len() == 1 {
773 if let Some(single) = value_filters.into_iter().next() {
774 for (key, value) in single {
775 filter.insert(key, value);
776 }
777 }
778 return Ok(filter);
779 }
780
781 let combine_with_and = matches!(
782 param.param_type,
783 SearchParamType::Date | SearchParamType::Number
784 );
785 let operator = if combine_with_and { "$and" } else { "$or" };
786 filter.insert(
787 operator,
788 Bson::Array(value_filters.into_iter().map(Bson::Document).collect()),
789 );
790
791 Ok(filter)
792 }
793
794 fn build_index_value_filter(
795 &self,
796 param: &SearchParameter,
797 value: &SearchValue,
798 ) -> StorageResult<Document> {
799 match param.name.as_str() {
800 "_text" | "_content" => {
801 return Err(StorageError::Search(SearchError::TextSearchNotAvailable));
802 }
803 "_id" | "_lastUpdated" => {
804 return Err(StorageError::Search(SearchError::QueryParseError {
805 message: format!(
806 "Special parameter '{}' should be resolved against resources, not search_index",
807 param.name
808 ),
809 }));
810 }
811 _ => {}
812 }
813
814 match param.param_type {
815 SearchParamType::String => self.build_string_filter(param, value),
816 SearchParamType::Token => self.build_token_filter(param, value),
817 SearchParamType::Date => self.build_date_filter(value, "value_date"),
818 SearchParamType::Number => self.build_number_filter(value),
819 SearchParamType::Reference => self.build_reference_filter(param, value),
820 SearchParamType::Uri => self.build_uri_filter(param, value),
821 SearchParamType::Quantity => self.build_quantity_filter(value),
822 SearchParamType::Composite => {
823 Err(StorageError::Search(SearchError::InvalidComposite {
824 message: "Composite search is not supported in MongoDB Phase 4".to_string(),
825 }))
826 }
827 SearchParamType::Special => Err(StorageError::Search(
828 SearchError::UnsupportedParameterType {
829 param_type: format!("special parameter {}", param.name),
830 },
831 )),
832 }
833 }
834
835 fn build_string_filter(
836 &self,
837 param: &SearchParameter,
838 value: &SearchValue,
839 ) -> StorageResult<Document> {
840 if value.prefix != SearchPrefix::Eq {
841 return Err(StorageError::Search(SearchError::QueryParseError {
842 message: format!(
843 "Unsupported prefix '{}' for string parameter '{}'",
844 value.prefix, param.name
845 ),
846 }));
847 }
848
849 let lowered = value.value.to_lowercase();
850 match param.modifier.as_ref() {
851 None => Ok(doc! {
852 "value_string": {
853 "$regex": format!("^{}", regex_escape(&lowered))
854 }
855 }),
856 Some(SearchModifier::Exact) => Ok(doc! { "value_string": lowered }),
857 Some(SearchModifier::Contains | SearchModifier::Text) => Ok(doc! {
860 "value_string": {
861 "$regex": regex_escape(&lowered)
862 }
863 }),
864 Some(other) => Err(StorageError::Search(SearchError::UnsupportedModifier {
865 modifier: other.to_string(),
866 param_type: "string".to_string(),
867 })),
868 }
869 }
870
871 fn build_token_filter(
872 &self,
873 param: &SearchParameter,
874 value: &SearchValue,
875 ) -> StorageResult<Document> {
876 if value.prefix != SearchPrefix::Eq {
877 return Err(StorageError::Search(SearchError::QueryParseError {
878 message: format!(
879 "Unsupported prefix '{}' for token parameter '{}'",
880 value.prefix, param.name
881 ),
882 }));
883 }
884
885 match param.modifier.as_ref() {
886 None => {}
887 Some(m @ (SearchModifier::Text | SearchModifier::CodeText)) => {
890 let escaped = regex_escape(&value.value);
891 let regex = if *m == SearchModifier::CodeText {
892 format!("^{}", escaped)
893 } else {
894 escaped
895 };
896 return Ok(doc! {
897 "value_token_display": { "$regex": regex, "$options": "i" }
898 });
899 }
900 Some(other) => {
901 return Err(StorageError::Search(SearchError::UnsupportedModifier {
902 modifier: other.to_string(),
903 param_type: "token".to_string(),
904 }));
905 }
906 }
907
908 if let Some((system, code)) = value.value.split_once('|') {
909 if system.is_empty() {
910 Ok(doc! { "value_token_code": code })
911 } else if code.is_empty() {
912 Ok(doc! { "value_token_system": system })
913 } else {
914 Ok(doc! {
915 "value_token_system": system,
916 "value_token_code": code,
917 })
918 }
919 } else {
920 Ok(doc! { "value_token_code": &value.value })
921 }
922 }
923
924 fn build_reference_filter(
925 &self,
926 param: &SearchParameter,
927 value: &SearchValue,
928 ) -> StorageResult<Document> {
929 if value.prefix != SearchPrefix::Eq {
930 return Err(StorageError::Search(SearchError::QueryParseError {
931 message: format!(
932 "Unsupported prefix '{}' for reference parameter '{}'",
933 value.prefix, param.name
934 ),
935 }));
936 }
937
938 if matches!(param.modifier.as_ref(), Some(SearchModifier::Contains)) {
940 return Ok(doc! {
941 "value_reference": {
942 "$regex": regex_escape(&value.value),
943 "$options": "i"
944 }
945 });
946 }
947
948 if matches!(
950 param.modifier.as_ref(),
951 Some(SearchModifier::Text | SearchModifier::CodeText)
952 ) {
953 let escaped = regex_escape(&value.value);
954 let regex = if matches!(param.modifier.as_ref(), Some(SearchModifier::CodeText)) {
955 format!("^{}", escaped)
956 } else {
957 escaped
958 };
959 return Ok(doc! {
960 "value_reference_display": { "$regex": regex, "$options": "i" }
961 });
962 }
963
964 if let Some(modifier) = ¶m.modifier {
965 return Err(StorageError::Search(SearchError::UnsupportedModifier {
966 modifier: modifier.to_string(),
967 param_type: "reference".to_string(),
968 }));
969 }
970
971 if value.value.contains('/') {
972 return Ok(doc! { "value_reference": &value.value });
973 }
974
975 Ok(doc! {
976 "$or": [
977 { "value_reference": &value.value },
978 {
979 "value_reference": {
980 "$regex": format!("/{}$", regex_escape(&value.value))
981 }
982 }
983 ]
984 })
985 }
986
987 fn build_uri_filter(
988 &self,
989 param: &SearchParameter,
990 value: &SearchValue,
991 ) -> StorageResult<Document> {
992 if value.prefix != SearchPrefix::Eq {
993 return Err(StorageError::Search(SearchError::QueryParseError {
994 message: format!(
995 "Unsupported prefix '{}' for uri parameter '{}'",
996 value.prefix, param.name
997 ),
998 }));
999 }
1000
1001 match param.modifier.as_ref() {
1002 None | Some(SearchModifier::Exact) => Ok(doc! { "value_uri": &value.value }),
1003 Some(SearchModifier::Contains) => Ok(doc! {
1004 "value_uri": {
1005 "$regex": regex_escape(&value.value)
1006 }
1007 }),
1008 Some(other) => Err(StorageError::Search(SearchError::UnsupportedModifier {
1009 modifier: other.to_string(),
1010 param_type: "uri".to_string(),
1011 })),
1012 }
1013 }
1014
1015 fn build_date_filter(&self, value: &SearchValue, field: &str) -> StorageResult<Document> {
1016 let parsed = parse_date_for_query(&value.value).ok_or_else(|| {
1017 StorageError::Search(SearchError::QueryParseError {
1018 message: format!("Invalid date value '{}'", value.value),
1019 })
1020 })?;
1021
1022 let bson_date = chrono_to_bson(parsed);
1023
1024 match value.prefix {
1025 SearchPrefix::Ap => {
1026 let lower = chrono_to_bson(parsed - chrono::Duration::hours(12));
1027 let upper = chrono_to_bson(parsed + chrono::Duration::hours(12));
1028 Ok(doc! {
1029 field: {
1030 "$gte": lower,
1031 "$lte": upper,
1032 }
1033 })
1034 }
1035 _ => {
1036 let op = Self::prefix_to_mongo_operator(value.prefix)?;
1037 Ok(doc! {
1038 field: {
1039 op: bson_date,
1040 }
1041 })
1042 }
1043 }
1044 }
1045
1046 fn build_quantity_filter(&self, value: &SearchValue) -> StorageResult<Document> {
1053 let parts: Vec<&str> = value.value.splitn(3, '|').collect();
1054 let parsed = parts[0].parse::<f64>().map_err(|e| {
1055 StorageError::Search(SearchError::QueryParseError {
1056 message: format!("Invalid quantity value '{}': {}", value.value, e),
1057 })
1058 })?;
1059
1060 let value_condition = match value.prefix {
1061 SearchPrefix::Ap => {
1062 let delta = (parsed.abs() * 0.1).max(0.1);
1063 doc! { "$gte": parsed - delta, "$lte": parsed + delta }
1064 }
1065 _ => {
1066 let op = Self::prefix_to_mongo_operator(value.prefix)?;
1067 doc! { op: parsed }
1068 }
1069 };
1070
1071 let mut filter = doc! { "value_quantity_value": value_condition };
1072 match parts.as_slice() {
1073 [_, system, code] => {
1075 if !system.is_empty() {
1076 filter.insert("value_quantity_system", *system);
1077 }
1078 if !code.is_empty() {
1079 filter.insert("value_quantity_unit", *code);
1080 }
1081 }
1082 [_, code] => {
1084 if !code.is_empty() {
1085 filter.insert("value_quantity_unit", *code);
1086 }
1087 }
1088 _ => {}
1089 }
1090
1091 Ok(filter)
1092 }
1093
1094 fn build_number_filter(&self, value: &SearchValue) -> StorageResult<Document> {
1095 let parsed = value.value.parse::<f64>().map_err(|e| {
1096 StorageError::Search(SearchError::QueryParseError {
1097 message: format!("Invalid number value '{}': {}", value.value, e),
1098 })
1099 })?;
1100
1101 match value.prefix {
1102 SearchPrefix::Ap => {
1103 let delta = (parsed.abs() * 0.1).max(0.1);
1104 Ok(doc! {
1105 "value_number": {
1106 "$gte": parsed - delta,
1107 "$lte": parsed + delta,
1108 }
1109 })
1110 }
1111 _ => {
1112 let op = Self::prefix_to_mongo_operator(value.prefix)?;
1113 Ok(doc! {
1114 "value_number": {
1115 op: parsed,
1116 }
1117 })
1118 }
1119 }
1120 }
1121
1122 fn prefix_to_mongo_operator(prefix: SearchPrefix) -> StorageResult<&'static str> {
1123 match prefix {
1124 SearchPrefix::Eq => Ok("$eq"),
1125 SearchPrefix::Ne => Ok("$ne"),
1126 SearchPrefix::Gt | SearchPrefix::Sa => Ok("$gt"),
1127 SearchPrefix::Lt | SearchPrefix::Eb => Ok("$lt"),
1128 SearchPrefix::Ge => Ok("$gte"),
1129 SearchPrefix::Le => Ok("$lte"),
1130 SearchPrefix::Ap => Ok("$eq"),
1131 }
1132 }
1133
1134 fn build_resource_filter(
1135 &self,
1136 tenant_id: &str,
1137 resource_type: &str,
1138 query: &SearchQuery,
1139 matched_ids: Option<&HashSet<String>>,
1140 cursor: Option<&PageCursor>,
1141 ) -> StorageResult<Document> {
1142 let mut conditions = vec![doc! {
1143 "tenant_id": tenant_id,
1144 "resource_type": resource_type,
1145 "is_deleted": false,
1146 }];
1147
1148 if let Some(ids) = matched_ids {
1149 let id_values = ids.iter().cloned().map(Bson::String).collect::<Vec<_>>();
1150 conditions.push(doc! {
1151 "id": { "$in": Bson::Array(id_values) }
1152 });
1153 }
1154
1155 for param in &query.parameters {
1156 match param.name.as_str() {
1157 "_id" => {
1158 conditions.push(self.build_resource_id_condition(param)?);
1159 }
1160 "_lastUpdated" => {
1161 conditions.extend(self.build_resource_last_updated_conditions(param)?);
1162 }
1163 _ => {}
1164 }
1165 }
1166
1167 if let Some(cursor) = cursor {
1168 conditions.push(self.build_cursor_condition(cursor)?);
1169 }
1170
1171 if conditions.len() == 1 {
1172 return Ok(conditions.remove(0));
1173 }
1174
1175 Ok(doc! {
1176 "$and": Bson::Array(conditions.into_iter().map(Bson::Document).collect())
1177 })
1178 }
1179
1180 fn build_resource_id_condition(&self, param: &SearchParameter) -> StorageResult<Document> {
1181 let mut ids = Vec::new();
1182
1183 for value in ¶m.values {
1184 if value.prefix != SearchPrefix::Eq {
1185 return Err(StorageError::Search(SearchError::QueryParseError {
1186 message: format!("Unsupported prefix '{}' for _id parameter", value.prefix),
1187 }));
1188 }
1189 ids.push(value.value.clone());
1190 }
1191
1192 if ids.len() == 1 {
1193 return Ok(doc! { "id": ids.remove(0) });
1194 }
1195
1196 Ok(doc! {
1197 "id": { "$in": Bson::Array(ids.into_iter().map(Bson::String).collect()) }
1198 })
1199 }
1200
1201 fn build_resource_last_updated_conditions(
1202 &self,
1203 param: &SearchParameter,
1204 ) -> StorageResult<Vec<Document>> {
1205 param
1206 .values
1207 .iter()
1208 .map(|value| self.build_date_filter(value, "last_updated"))
1209 .collect()
1210 }
1211
1212 fn build_cursor_condition(&self, cursor: &PageCursor) -> StorageResult<Document> {
1213 let timestamp = match cursor.sort_values().first() {
1214 Some(CursorValue::String(value)) => DateTime::parse_from_rfc3339(value)
1215 .map_err(|_| {
1216 StorageError::Search(SearchError::InvalidCursor {
1217 cursor: cursor.encode(),
1218 })
1219 })?
1220 .with_timezone(&Utc),
1221 _ => {
1222 return Err(StorageError::Search(SearchError::InvalidCursor {
1223 cursor: cursor.encode(),
1224 }));
1225 }
1226 };
1227
1228 let ts = chrono_to_bson(timestamp);
1229 let id = cursor.resource_id().to_string();
1230
1231 if cursor.direction() == CursorDirection::Previous {
1232 Ok(doc! {
1233 "$or": [
1234 { "last_updated": { "$gt": ts } },
1235 { "last_updated": ts, "id": { "$gt": id } }
1236 ]
1237 })
1238 } else {
1239 Ok(doc! {
1240 "$or": [
1241 { "last_updated": { "$lt": ts } },
1242 { "last_updated": ts, "id": { "$lt": id } }
1243 ]
1244 })
1245 }
1246 }
1247
1248 fn build_sort_document(
1249 &self,
1250 query: &SearchQuery,
1251 previous_mode: bool,
1252 ) -> StorageResult<Document> {
1253 if query.sort.is_empty() {
1254 return Ok(if previous_mode {
1255 doc! { "last_updated": 1_i32, "id": 1_i32 }
1256 } else {
1257 doc! { "last_updated": -1_i32, "id": -1_i32 }
1258 });
1259 }
1260
1261 let mut sort = Document::new();
1262
1263 for directive in &query.sort {
1264 let field = match directive.parameter.as_str() {
1265 "_lastUpdated" => "last_updated",
1266 "_id" | "id" => "id",
1267 other => {
1268 return Err(StorageError::Search(
1269 SearchError::UnsupportedParameterType {
1270 param_type: format!("sort parameter '{}'", other),
1271 },
1272 ));
1273 }
1274 };
1275
1276 let mut dir = if directive.direction == crate::types::SortDirection::Descending {
1277 -1_i32
1278 } else {
1279 1_i32
1280 };
1281
1282 if previous_mode {
1283 dir = -dir;
1284 }
1285
1286 sort.insert(field, dir);
1287 }
1288
1289 if !sort.contains_key("id") {
1290 sort.insert("id", if previous_mode { 1_i32 } else { -1_i32 });
1291 }
1292
1293 Ok(sort)
1294 }
1295
1296 fn document_to_stored_resource(
1297 &self,
1298 tenant: &TenantContext,
1299 fallback_resource_type: &str,
1300 doc: Document,
1301 ) -> StorageResult<StoredResource> {
1302 let resource_type = doc
1303 .get_str("resource_type")
1304 .ok()
1305 .unwrap_or(fallback_resource_type)
1306 .to_string();
1307
1308 let id = doc
1309 .get_str("id")
1310 .map_err(|e| internal_error(format!("Missing resource id in search result: {}", e)))?
1311 .to_string();
1312
1313 let version_id = doc
1314 .get_str("version_id")
1315 .map_err(|e| internal_error(format!("Missing version_id in search result: {}", e)))?
1316 .to_string();
1317
1318 let payload = doc.get_document("data").map_err(|e| {
1319 internal_error(format!("Missing resource payload in search result: {}", e))
1320 })?;
1321
1322 let content = bson::from_bson::<Value>(Bson::Document(payload.clone())).map_err(|e| {
1323 serialization_error(format!("Failed to deserialize resource payload: {}", e))
1324 })?;
1325
1326 let now = Utc::now();
1327 let created_at = doc
1328 .get_datetime("created_at")
1329 .map(bson_to_chrono)
1330 .unwrap_or(now);
1331
1332 let last_updated = doc
1333 .get_datetime("last_updated")
1334 .map(bson_to_chrono)
1335 .unwrap_or(created_at);
1336
1337 let deleted_at = match doc.get("deleted_at") {
1338 Some(Bson::DateTime(value)) => Some(bson_to_chrono(value)),
1339 _ => None,
1340 };
1341
1342 let fhir_version = doc
1343 .get_str("fhir_version")
1344 .ok()
1345 .and_then(FhirVersion::from_storage)
1346 .unwrap_or_else(FhirVersion::default_enabled);
1347
1348 Ok(StoredResource::from_storage(
1349 resource_type,
1350 id,
1351 version_id,
1352 tenant.tenant_id().clone(),
1353 content,
1354 created_at,
1355 last_updated,
1356 deleted_at,
1357 fhir_version,
1358 ))
1359 }
1360
1361 async fn find_matching_resources(
1362 &self,
1363 tenant: &TenantContext,
1364 resource_type: &str,
1365 search_params_str: &str,
1366 ) -> StorageResult<Vec<StoredResource>> {
1367 let parsed_params = parse_simple_search_params(search_params_str);
1368
1369 if parsed_params.is_empty() {
1370 return Ok(Vec::new());
1371 }
1372
1373 let search_params = self.build_search_parameters(resource_type, &parsed_params);
1374
1375 let query = SearchQuery {
1376 resource_type: resource_type.to_string(),
1377 parameters: search_params,
1378 count: Some(1000),
1379 ..Default::default()
1380 };
1381
1382 let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
1383 Ok(result.resources.items)
1384 }
1385
1386 fn build_search_parameters(
1387 &self,
1388 resource_type: &str,
1389 params: &[(String, String)],
1390 ) -> Vec<SearchParameter> {
1391 let registry = self.search_registry().read();
1392
1393 params
1394 .iter()
1395 .map(|(name, value)| {
1396 let values = vec![SearchValue::parse(value)];
1397 let param_type =
1398 crate::search::resolve_param_type(®istry, resource_type, name, &values);
1399
1400 SearchParameter {
1401 name: name.clone(),
1402 param_type,
1403 modifier: None,
1404 values,
1405 chain: vec![],
1406 components: vec![],
1407 }
1408 })
1409 .collect()
1410 }
1411
1412 fn merge_unique(target: &mut Vec<StoredResource>, additions: Vec<StoredResource>) {
1413 let mut seen: HashSet<String> = target
1414 .iter()
1415 .map(|r| format!("{}/{}", r.resource_type(), r.id()))
1416 .collect();
1417 for resource in additions {
1418 let key = format!("{}/{}", resource.resource_type(), resource.id());
1419 if seen.insert(key) {
1420 target.push(resource);
1421 }
1422 }
1423 }
1424
1425 fn extract_references(content: &Value, search_param: &str) -> Vec<String> {
1426 let mut refs = Vec::new();
1427 if let Some(value) = content.get(search_param) {
1428 Self::collect_references_from_value(value, &mut refs);
1429 }
1430 refs
1431 }
1432
1433 fn collect_references_from_value(value: &Value, refs: &mut Vec<String>) {
1434 match value {
1435 Value::Object(obj) => {
1436 if let Some(Value::String(reference)) = obj.get("reference") {
1437 refs.push(reference.clone());
1438 }
1439 for v in obj.values() {
1440 Self::collect_references_from_value(v, refs);
1441 }
1442 }
1443 Value::Array(arr) => {
1444 for item in arr {
1445 Self::collect_references_from_value(item, refs);
1446 }
1447 }
1448 _ => {}
1449 }
1450 }
1451
1452 fn parse_reference(reference: &str) -> Option<(String, String)> {
1453 let trimmed = reference
1454 .strip_prefix("http://")
1455 .or_else(|| reference.strip_prefix("https://"))
1456 .unwrap_or(reference);
1457
1458 let mut segments: Vec<&str> = trimmed.split('/').filter(|s| !s.is_empty()).collect();
1459 if segments.len() < 2 {
1460 return None;
1461 }
1462
1463 let id = segments.pop()?.to_string();
1464 let resource_type = segments.pop()?.to_string();
1465
1466 if !resource_type
1467 .chars()
1468 .next()
1469 .is_some_and(|c| c.is_ascii_uppercase())
1470 {
1471 return None;
1472 }
1473
1474 Some((resource_type, id))
1475 }
1476
1477 async fn fetch_resource_by_type_id(
1478 &self,
1479 tenant: &TenantContext,
1480 resource_type: &str,
1481 id: &str,
1482 ) -> StorageResult<Option<StoredResource>> {
1483 let db = self.get_database().await?;
1484 let resources = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
1485 let tenant_id = tenant.tenant_id().as_str();
1486
1487 let doc = resources
1488 .find_one(doc! {
1489 "tenant_id": tenant_id,
1490 "resource_type": resource_type,
1491 "id": id,
1492 "is_deleted": false,
1493 })
1494 .await
1495 .map_err(|e| internal_error(format!("Failed to fetch included resource: {}", e)))?;
1496
1497 match doc {
1498 Some(doc) => Ok(Some(self.document_to_stored_resource(
1499 tenant,
1500 resource_type,
1501 doc,
1502 )?)),
1503 None => Ok(None),
1504 }
1505 }
1506}
1507
1508#[async_trait]
1509impl IncludeProvider for MongoBackend {
1510 async fn resolve_includes(
1511 &self,
1512 tenant: &TenantContext,
1513 resources: &[StoredResource],
1514 includes: &[IncludeDirective],
1515 ) -> StorageResult<Vec<StoredResource>> {
1516 if resources.is_empty() || includes.is_empty() {
1517 return Ok(Vec::new());
1518 }
1519
1520 let mut included = Vec::new();
1521 let mut seen: HashSet<String> = HashSet::new();
1522
1523 for include in includes {
1524 for resource in resources {
1525 if resource.resource_type() != include.source_type {
1526 continue;
1527 }
1528
1529 let refs = Self::extract_references(resource.content(), &include.search_param);
1530 for reference in refs {
1531 let Some((ref_type, ref_id)) = Self::parse_reference(&reference) else {
1532 continue;
1533 };
1534
1535 if let Some(target) = include.target_type.as_ref() {
1536 if ref_type != *target {
1537 continue;
1538 }
1539 }
1540
1541 let key = format!("{}/{}", ref_type, ref_id);
1542 if !seen.insert(key) {
1543 continue;
1544 }
1545
1546 if let Some(stored) = self
1547 .fetch_resource_by_type_id(tenant, &ref_type, &ref_id)
1548 .await?
1549 {
1550 included.push(stored);
1551 }
1552 }
1553 }
1554 }
1555
1556 Ok(included)
1557 }
1558}
1559
1560#[async_trait]
1561impl RevincludeProvider for MongoBackend {
1562 async fn resolve_revincludes(
1563 &self,
1564 tenant: &TenantContext,
1565 resources: &[StoredResource],
1566 revincludes: &[IncludeDirective],
1567 ) -> StorageResult<Vec<StoredResource>> {
1568 if resources.is_empty() || revincludes.is_empty() {
1569 return Ok(Vec::new());
1570 }
1571
1572 let db = self.get_database().await?;
1573 let tenant_id = tenant.tenant_id().as_str();
1574 let search_index = db.collection::<Document>(MongoBackend::SEARCH_INDEX_COLLECTION);
1575 let resources_collection = db.collection::<Document>(MongoBackend::RESOURCES_COLLECTION);
1576
1577 let mut included = Vec::new();
1578 let mut seen: HashSet<String> = HashSet::new();
1579
1580 for revinclude in revincludes {
1581 if revinclude.source_type.is_empty() {
1582 continue;
1583 }
1584
1585 let mut reference_values: Vec<String> = Vec::with_capacity(resources.len() * 2);
1586 for resource in resources {
1587 reference_values.push(format!("{}/{}", resource.resource_type(), resource.id()));
1588 reference_values.push(resource.id().to_string());
1589 }
1590 reference_values.sort();
1591 reference_values.dedup();
1592
1593 if reference_values.is_empty() {
1594 continue;
1595 }
1596
1597 let bson_values: Vec<Bson> = reference_values.into_iter().map(Bson::String).collect();
1598 let index_filter = doc! {
1599 "tenant_id": tenant_id,
1600 "resource_type": &revinclude.source_type,
1601 "param_name": &revinclude.search_param,
1602 "value_reference": { "$in": Bson::Array(bson_values) },
1603 };
1604
1605 let matching_ids: Vec<String> = search_index
1606 .distinct("resource_id", index_filter)
1607 .await
1608 .map_err(|e| {
1609 internal_error(format!(
1610 "Failed to query search_index for revinclude: {}",
1611 e
1612 ))
1613 })?
1614 .into_iter()
1615 .filter_map(|value| value.as_str().map(ToString::to_string))
1616 .collect();
1617
1618 if matching_ids.is_empty() {
1619 continue;
1620 }
1621
1622 let id_bson: Vec<Bson> = matching_ids.into_iter().map(Bson::String).collect();
1623 let resource_filter = doc! {
1624 "tenant_id": tenant_id,
1625 "resource_type": &revinclude.source_type,
1626 "is_deleted": false,
1627 "id": { "$in": Bson::Array(id_bson) },
1628 };
1629
1630 let docs =
1631 collect_documents(resources_collection.find(resource_filter).await.map_err(
1632 |e| internal_error(format!("Failed to fetch revinclude resources: {}", e)),
1633 )?)
1634 .await?;
1635
1636 for doc in docs {
1637 let stored =
1638 self.document_to_stored_resource(tenant, &revinclude.source_type, doc)?;
1639 let key = format!("{}/{}", stored.resource_type(), stored.id());
1640 if seen.insert(key) {
1641 included.push(stored);
1642 }
1643 }
1644 }
1645
1646 Ok(included)
1647 }
1648}