Skip to main content

helios_persistence/composite/
merger.rs

1//! Result merging strategies for composite storage.
2//!
3//! This module provides strategies for merging search results from multiple backends.
4//!
5//! # Merge Strategies
6//!
7//! | Strategy | Description | Use Case |
8//! |----------|-------------|----------|
9//! | Intersection | Results must appear in all sources | Precise filtering |
10//! | Union | Results from any source | Broad search |
11//! | PrimaryEnriched | Primary results, enriched by secondaries | Metadata augmentation |
12//! | SecondaryFiltered | Filter secondary results through primary | Candidate validation |
13//!
14//! # Example
15//!
16//! ```ignore
17//! use helios_persistence::composite::merger::{ResultMerger, MergeOptions, MergeStrategy};
18//!
19//! let merger = ResultMerger::new();
20//!
21//! let merged = merger.merge(
22//!     primary_result,
23//!     vec![("es".to_string(), es_result)],
24//!     MergeOptions {
25//!         strategy: MergeStrategy::Intersection,
26//!         preserve_primary_order: true,
27//!         deduplicate: true,
28//!     },
29//! )?;
30//! ```
31
32use std::collections::{HashMap, HashSet};
33
34use crate::core::SearchResult;
35use crate::error::StorageResult;
36use crate::types::{Page, PageInfo, StoredResource};
37
38use super::router::MergeStrategy;
39
40/// Options for merging results.
41#[derive(Debug, Clone)]
42pub struct MergeOptions {
43    /// The merge strategy to use.
44    pub strategy: MergeStrategy,
45
46    /// Whether to preserve the ordering from the primary result.
47    pub preserve_primary_order: bool,
48
49    /// Whether to deduplicate results.
50    pub deduplicate: bool,
51}
52
53impl Default for MergeOptions {
54    fn default() -> Self {
55        Self {
56            strategy: MergeStrategy::Intersection,
57            preserve_primary_order: true,
58            deduplicate: true,
59        }
60    }
61}
62
63/// Result merger for combining results from multiple backends.
64pub struct ResultMerger {
65    /// Maximum results to return.
66    max_results: usize,
67}
68
69impl ResultMerger {
70    /// Creates a new result merger.
71    pub fn new() -> Self {
72        Self { max_results: 1000 }
73    }
74
75    /// Creates a merger with a custom max results limit.
76    pub fn with_max_results(mut self, max: usize) -> Self {
77        self.max_results = max;
78        self
79    }
80
81    /// Merges results from primary and auxiliary backends.
82    pub fn merge(
83        &self,
84        primary: SearchResult,
85        auxiliary: Vec<(String, SearchResult)>,
86        options: MergeOptions,
87    ) -> StorageResult<SearchResult> {
88        match options.strategy {
89            MergeStrategy::Intersection => self.merge_intersection(primary, auxiliary, &options),
90            MergeStrategy::Union => self.merge_union(primary, auxiliary, &options),
91            MergeStrategy::PrimaryEnriched => {
92                self.merge_primary_enriched(primary, auxiliary, &options)
93            }
94            MergeStrategy::SecondaryFiltered => {
95                self.merge_secondary_filtered(primary, auxiliary, &options)
96            }
97        }
98    }
99
100    /// Intersection merge: results must appear in all sources.
101    fn merge_intersection(
102        &self,
103        primary: SearchResult,
104        auxiliary: Vec<(String, SearchResult)>,
105        options: &MergeOptions,
106    ) -> StorageResult<SearchResult> {
107        if auxiliary.is_empty() {
108            return Ok(primary);
109        }
110
111        // Build set of IDs from each auxiliary source
112        let aux_id_sets: Vec<HashSet<String>> = auxiliary
113            .iter()
114            .map(|(_, result)| result.resources.items.iter().map(resource_key).collect())
115            .collect();
116
117        // Filter primary to only include resources that appear in ALL auxiliary sets
118        let mut filtered_items = Vec::new();
119        for resource in primary.resources.items {
120            let key = resource_key(&resource);
121            if aux_id_sets.iter().all(|set| set.contains(&key)) {
122                filtered_items.push(resource);
123            }
124        }
125
126        // Limit results
127        if filtered_items.len() > self.max_results {
128            filtered_items.truncate(self.max_results);
129        }
130
131        // Combine included resources
132        let mut all_included = primary.included;
133        for (_, aux_result) in auxiliary {
134            all_included.extend(aux_result.included);
135        }
136
137        if options.deduplicate {
138            all_included = deduplicate_resources(all_included);
139        }
140
141        Ok(SearchResult {
142            resources: Page::new(filtered_items, primary.resources.page_info),
143            included: all_included,
144            total: None, // Total is now uncertain due to filtering
145        })
146    }
147
148    /// Union merge: results from any source (OR).
149    fn merge_union(
150        &self,
151        primary: SearchResult,
152        auxiliary: Vec<(String, SearchResult)>,
153        options: &MergeOptions,
154    ) -> StorageResult<SearchResult> {
155        let mut all_resources = primary.resources.items;
156        let mut seen_keys: HashSet<String> = all_resources.iter().map(resource_key).collect();
157
158        // Add resources from auxiliary sources
159        for (_, aux_result) in auxiliary {
160            for resource in aux_result.resources.items {
161                let key = resource_key(&resource);
162                if !seen_keys.contains(&key) {
163                    seen_keys.insert(key);
164                    all_resources.push(resource);
165                }
166            }
167        }
168
169        // Sort if not preserving primary order
170        if !options.preserve_primary_order {
171            // Sort by last updated, descending
172            all_resources.sort_by_key(|r| std::cmp::Reverse(r.last_modified()));
173        }
174
175        // Limit results
176        if all_resources.len() > self.max_results {
177            all_resources.truncate(self.max_results);
178        }
179
180        Ok(SearchResult {
181            resources: Page::new(all_resources, primary.resources.page_info),
182            included: primary.included,
183            total: None,
184        })
185    }
186
187    /// Primary enriched: primary results with metadata from secondaries.
188    fn merge_primary_enriched(
189        &self,
190        primary: SearchResult,
191        _auxiliary: Vec<(String, SearchResult)>,
192        _options: &MergeOptions,
193    ) -> StorageResult<SearchResult> {
194        // For FHIR resources, we generally don't want to modify the content
195        // This strategy is more about adding metadata that doesn't change resources
196        // For now, just return primary results unchanged
197        Ok(primary)
198    }
199
200    /// Secondary filtered: filter secondary results through primary.
201    fn merge_secondary_filtered(
202        &self,
203        primary: SearchResult,
204        auxiliary: Vec<(String, SearchResult)>,
205        _options: &MergeOptions,
206    ) -> StorageResult<SearchResult> {
207        if auxiliary.is_empty() {
208            return Ok(primary);
209        }
210
211        // Get IDs from all auxiliary sources (union of auxiliary IDs)
212        let mut aux_ids: HashSet<String> = HashSet::new();
213        for (_, aux_result) in &auxiliary {
214            for resource in &aux_result.resources.items {
215                aux_ids.insert(resource_key(resource));
216            }
217        }
218
219        // Filter primary to only include resources that appear in auxiliary results
220        let filtered_items: Vec<_> = primary
221            .resources
222            .items
223            .into_iter()
224            .filter(|r| aux_ids.contains(&resource_key(r)))
225            .take(self.max_results)
226            .collect();
227
228        Ok(SearchResult {
229            resources: Page::new(filtered_items, primary.resources.page_info),
230            included: primary.included,
231            total: None,
232        })
233    }
234
235    /// Merges ID sets from multiple sources.
236    pub fn merge_ids(&self, sources: Vec<Vec<String>>, strategy: MergeStrategy) -> Vec<String> {
237        match strategy {
238            MergeStrategy::Intersection => self.intersect_ids(sources),
239            MergeStrategy::Union => self.union_ids(sources),
240            _ => self.intersect_ids(sources),
241        }
242    }
243
244    /// Computes intersection of ID sets.
245    fn intersect_ids(&self, sources: Vec<Vec<String>>) -> Vec<String> {
246        if sources.is_empty() {
247            return Vec::new();
248        }
249
250        if sources.len() == 1 {
251            return sources.into_iter().next().unwrap();
252        }
253
254        let mut sets: Vec<HashSet<String>> = sources
255            .into_iter()
256            .map(|v| v.into_iter().collect())
257            .collect();
258
259        // Sort by size (smallest first for efficiency)
260        sets.sort_by_key(|s| s.len());
261
262        let mut result: HashSet<String> = sets.remove(0);
263        for set in sets {
264            result = result.intersection(&set).cloned().collect();
265        }
266
267        result.into_iter().collect()
268    }
269
270    /// Computes union of ID sets.
271    fn union_ids(&self, sources: Vec<Vec<String>>) -> Vec<String> {
272        let mut result: HashSet<String> = HashSet::new();
273        for source in sources {
274            result.extend(source);
275        }
276        result.into_iter().collect()
277    }
278}
279
280impl Default for ResultMerger {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// Creates a unique key for a resource.
287fn resource_key(resource: &StoredResource) -> String {
288    format!("{}/{}", resource.resource_type(), resource.id())
289}
290
291/// Deduplicates resources by their key.
292fn deduplicate_resources(resources: Vec<StoredResource>) -> Vec<StoredResource> {
293    let mut seen = HashSet::new();
294    resources
295        .into_iter()
296        .filter(|r| seen.insert(resource_key(r)))
297        .collect()
298}
299
300/// Weighted result for relevance-based merging.
301#[derive(Debug, Clone)]
302pub struct WeightedResult {
303    /// The resource.
304    pub resource: StoredResource,
305
306    /// Relevance score (higher is better).
307    pub score: f64,
308
309    /// Source backend ID.
310    pub source: String,
311}
312
313/// Relevance-based merger for search results.
314pub struct RelevanceMerger {
315    /// Backend weights for scoring.
316    weights: HashMap<String, f64>,
317}
318
319impl RelevanceMerger {
320    /// Creates a new relevance merger.
321    pub fn new() -> Self {
322        Self {
323            weights: HashMap::new(),
324        }
325    }
326
327    /// Sets weight for a backend.
328    pub fn with_weight(mut self, backend_id: impl Into<String>, weight: f64) -> Self {
329        self.weights.insert(backend_id.into(), weight);
330        self
331    }
332
333    /// Merges results with relevance scoring.
334    pub fn merge_with_relevance(
335        &self,
336        results: Vec<(String, SearchResult)>,
337        max_results: usize,
338    ) -> SearchResult {
339        let mut weighted: Vec<WeightedResult> = Vec::new();
340
341        for (source, result) in results {
342            let base_weight = self.weights.get(&source).copied().unwrap_or(1.0);
343
344            for (idx, resource) in result.resources.items.into_iter().enumerate() {
345                // Score based on position and source weight
346                // Earlier positions get higher scores
347                let position_score = 1.0 / (idx as f64 + 1.0);
348                let score = position_score * base_weight;
349
350                weighted.push(WeightedResult {
351                    resource,
352                    score,
353                    source: source.clone(),
354                });
355            }
356        }
357
358        // Sort by score descending
359        weighted.sort_by(|a, b| {
360            b.score
361                .partial_cmp(&a.score)
362                .unwrap_or(std::cmp::Ordering::Equal)
363        });
364
365        // Deduplicate, keeping highest scored
366        let mut seen = HashSet::new();
367        let final_results: Vec<StoredResource> = weighted
368            .into_iter()
369            .filter(|w| seen.insert(resource_key(&w.resource)))
370            .take(max_results)
371            .map(|w| w.resource)
372            .collect();
373
374        SearchResult {
375            resources: Page::new(final_results, PageInfo::end()),
376            included: Vec::new(),
377            total: None,
378        }
379    }
380}
381
382impl Default for RelevanceMerger {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use crate::tenant::TenantId;
392    use helios_fhir::FhirVersion;
393
394    fn make_resource(resource_type: &str, id: &str) -> StoredResource {
395        StoredResource::new(
396            resource_type,
397            id,
398            TenantId::new("test"),
399            serde_json::json!({"resourceType": resource_type, "id": id}),
400            FhirVersion::default(),
401        )
402    }
403
404    fn make_result(resources: Vec<StoredResource>) -> SearchResult {
405        SearchResult {
406            resources: Page::new(resources, PageInfo::end()),
407            included: Vec::new(),
408            total: None,
409        }
410    }
411
412    #[test]
413    fn test_intersection_merge() {
414        let merger = ResultMerger::new();
415
416        let primary = make_result(vec![
417            make_resource("Patient", "1"),
418            make_resource("Patient", "2"),
419            make_resource("Patient", "3"),
420        ]);
421
422        let aux = vec![(
423            "es".to_string(),
424            make_result(vec![
425                make_resource("Patient", "2"),
426                make_resource("Patient", "3"),
427                make_resource("Patient", "4"),
428            ]),
429        )];
430
431        let merged = merger.merge(primary, aux, MergeOptions::default()).unwrap();
432
433        // Only 2 and 3 should remain (intersection)
434        assert_eq!(merged.resources.len(), 2);
435        let ids: Vec<_> = merged.resources.items.iter().map(|r| r.id()).collect();
436        assert!(ids.contains(&"2"));
437        assert!(ids.contains(&"3"));
438    }
439
440    #[test]
441    fn test_union_merge() {
442        let merger = ResultMerger::new();
443
444        let primary = make_result(vec![
445            make_resource("Patient", "1"),
446            make_resource("Patient", "2"),
447        ]);
448
449        let aux = vec![(
450            "es".to_string(),
451            make_result(vec![
452                make_resource("Patient", "2"),
453                make_resource("Patient", "3"),
454            ]),
455        )];
456
457        let merged = merger
458            .merge(
459                primary,
460                aux,
461                MergeOptions {
462                    strategy: MergeStrategy::Union,
463                    ..Default::default()
464                },
465            )
466            .unwrap();
467
468        // All unique resources (1, 2, 3)
469        assert_eq!(merged.resources.len(), 3);
470    }
471
472    #[test]
473    fn test_secondary_filtered_merge() {
474        let merger = ResultMerger::new();
475
476        let primary = make_result(vec![
477            make_resource("Patient", "1"),
478            make_resource("Patient", "2"),
479            make_resource("Patient", "3"),
480        ]);
481
482        let aux = vec![(
483            "graph".to_string(),
484            make_result(vec![make_resource("Patient", "2")]),
485        )];
486
487        let merged = merger
488            .merge(
489                primary,
490                aux,
491                MergeOptions {
492                    strategy: MergeStrategy::SecondaryFiltered,
493                    ..Default::default()
494                },
495            )
496            .unwrap();
497
498        // Only 2 should remain (filtered by secondary)
499        assert_eq!(merged.resources.len(), 1);
500        assert_eq!(merged.resources.items[0].id(), "2");
501    }
502
503    #[test]
504    fn test_id_intersection() {
505        let merger = ResultMerger::new();
506
507        let sources = vec![
508            vec!["1".to_string(), "2".to_string(), "3".to_string()],
509            vec!["2".to_string(), "3".to_string(), "4".to_string()],
510            vec!["3".to_string(), "4".to_string(), "5".to_string()],
511        ];
512
513        let result = merger.merge_ids(sources, MergeStrategy::Intersection);
514        assert_eq!(result.len(), 1);
515        assert!(result.contains(&"3".to_string()));
516    }
517
518    #[test]
519    fn test_id_union() {
520        let merger = ResultMerger::new();
521
522        let sources = vec![
523            vec!["1".to_string(), "2".to_string()],
524            vec!["3".to_string(), "4".to_string()],
525        ];
526
527        let result = merger.merge_ids(sources, MergeStrategy::Union);
528        assert_eq!(result.len(), 4);
529    }
530
531    #[test]
532    fn test_relevance_merge() {
533        let merger = RelevanceMerger::new()
534            .with_weight("primary", 2.0)
535            .with_weight("search", 1.0);
536
537        let results = vec![
538            (
539                "primary".to_string(),
540                make_result(vec![
541                    make_resource("Patient", "1"),
542                    make_resource("Patient", "2"),
543                ]),
544            ),
545            (
546                "search".to_string(),
547                make_result(vec![
548                    make_resource("Patient", "3"),
549                    make_resource("Patient", "1"), // duplicate
550                ]),
551            ),
552        ];
553
554        let merged = merger.merge_with_relevance(results, 10);
555
556        // Patient 1 should be first (highest weight from primary)
557        assert_eq!(merged.resources.items[0].id(), "1");
558        // Should have 3 unique resources
559        assert_eq!(merged.resources.len(), 3);
560    }
561}