Skip to main content

nodedb_cluster/distributed_spatial/
merge.rs

1//! Cross-shard spatial result merging.
2//!
3//! Unlike vector search (which needs distance re-ranking), spatial predicates
4//! are boolean — a document either matches or it doesn't. The merge is a
5//! simple concatenation of shard results with deduplication.
6
7use serde::{Deserialize, Serialize};
8
9/// A single spatial match from a shard.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SpatialHit {
12    /// Document ID.
13    pub doc_id: String,
14    /// Which shard produced this hit.
15    pub shard_id: u16,
16    /// Distance to query geometry in meters (for ST_DWithin ordering).
17    /// 0.0 for non-distance predicates (ST_Contains, ST_Intersects).
18    pub distance_meters: f64,
19}
20
21/// Results from a single shard's local spatial query.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ShardSpatialResult {
24    pub shard_id: u16,
25    pub hits: Vec<SpatialHit>,
26    pub success: bool,
27    pub error: Option<String>,
28}
29
30/// Merges spatial results from multiple shards.
31///
32/// For boolean predicates (ST_Contains, ST_Intersects): simple concatenation.
33/// For distance predicates (ST_DWithin): merge and sort by distance, take limit.
34pub struct SpatialResultMerger {
35    all_hits: Vec<SpatialHit>,
36    responded: usize,
37    expected: usize,
38}
39
40impl SpatialResultMerger {
41    pub fn new(expected_shards: usize) -> Self {
42        Self {
43            all_hits: Vec::new(),
44            responded: 0,
45            expected: expected_shards,
46        }
47    }
48
49    /// Add a shard's results.
50    pub fn add_shard_result(&mut self, result: &ShardSpatialResult) {
51        if result.success {
52            self.all_hits.extend_from_slice(&result.hits);
53        }
54        self.responded += 1;
55    }
56
57    /// Whether all expected shards have responded.
58    pub fn all_responded(&self) -> bool {
59        self.responded >= self.expected
60    }
61
62    /// Merge all results: deduplicate by doc_id, optionally sort by distance.
63    ///
64    /// For ST_DWithin, results are sorted by distance (nearest first).
65    /// For boolean predicates, order is arbitrary. Truncates to `limit`.
66    pub fn merge(&mut self, limit: usize, sort_by_distance: bool) -> Vec<SpatialHit> {
67        // Deduplicate by doc_id (a document can only appear on one shard,
68        // but defensive in case of ghost stubs or migration overlap).
69        let mut seen = std::collections::HashSet::new();
70        self.all_hits.retain(|h| seen.insert(h.doc_id.clone()));
71
72        if sort_by_distance {
73            self.all_hits.sort_by(|a, b| {
74                a.distance_meters
75                    .partial_cmp(&b.distance_meters)
76                    .unwrap_or(std::cmp::Ordering::Equal)
77            });
78        }
79
80        self.all_hits.truncate(limit);
81        self.all_hits.clone()
82    }
83
84    /// Total hits collected (before merge).
85    pub fn total_hits(&self) -> usize {
86        self.all_hits.len()
87    }
88
89    pub fn response_count(&self) -> usize {
90        self.responded
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97
98    #[test]
99    fn merge_two_shards_boolean() {
100        let mut merger = SpatialResultMerger::new(2);
101        merger.add_shard_result(&ShardSpatialResult {
102            shard_id: 0,
103            hits: vec![
104                SpatialHit {
105                    doc_id: "a".into(),
106                    shard_id: 0,
107                    distance_meters: 0.0,
108                },
109                SpatialHit {
110                    doc_id: "b".into(),
111                    shard_id: 0,
112                    distance_meters: 0.0,
113                },
114            ],
115            success: true,
116            error: None,
117        });
118        merger.add_shard_result(&ShardSpatialResult {
119            shard_id: 1,
120            hits: vec![SpatialHit {
121                doc_id: "c".into(),
122                shard_id: 1,
123                distance_meters: 0.0,
124            }],
125            success: true,
126            error: None,
127        });
128
129        assert!(merger.all_responded());
130        let results = merger.merge(10, false);
131        assert_eq!(results.len(), 3);
132    }
133
134    #[test]
135    fn merge_with_distance_sort() {
136        let mut merger = SpatialResultMerger::new(2);
137        merger.add_shard_result(&ShardSpatialResult {
138            shard_id: 0,
139            hits: vec![SpatialHit {
140                doc_id: "far".into(),
141                shard_id: 0,
142                distance_meters: 500.0,
143            }],
144            success: true,
145            error: None,
146        });
147        merger.add_shard_result(&ShardSpatialResult {
148            shard_id: 1,
149            hits: vec![SpatialHit {
150                doc_id: "near".into(),
151                shard_id: 1,
152                distance_meters: 100.0,
153            }],
154            success: true,
155            error: None,
156        });
157
158        let results = merger.merge(10, true);
159        assert_eq!(results[0].doc_id, "near");
160        assert_eq!(results[1].doc_id, "far");
161    }
162
163    #[test]
164    fn merge_with_failed_shard() {
165        let mut merger = SpatialResultMerger::new(2);
166        merger.add_shard_result(&ShardSpatialResult {
167            shard_id: 0,
168            hits: vec![SpatialHit {
169                doc_id: "a".into(),
170                shard_id: 0,
171                distance_meters: 0.0,
172            }],
173            success: true,
174            error: None,
175        });
176        merger.add_shard_result(&ShardSpatialResult {
177            shard_id: 1,
178            hits: vec![],
179            success: false,
180            error: Some("timeout".into()),
181        });
182
183        let results = merger.merge(10, false);
184        assert_eq!(results.len(), 1);
185    }
186
187    #[test]
188    fn merge_respects_limit() {
189        let mut merger = SpatialResultMerger::new(1);
190        merger.add_shard_result(&ShardSpatialResult {
191            shard_id: 0,
192            hits: (0..100)
193                .map(|i| SpatialHit {
194                    doc_id: format!("d{i}"),
195                    shard_id: 0,
196                    distance_meters: i as f64,
197                })
198                .collect(),
199            success: true,
200            error: None,
201        });
202        let results = merger.merge(5, true);
203        assert_eq!(results.len(), 5);
204        assert_eq!(results[0].distance_meters, 0.0);
205    }
206}