Skip to main content

nodedb_cluster/distributed_spatial/
coordinator.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Spatial scatter-gather coordinator for cross-shard spatial queries.
4//!
5//! Same pattern as vector/timeseries distributed queries:
6//! coordinator → VShardEnvelope per shard → collect responses → merge.
7
8use super::merge::{ShardSpatialResult, SpatialResultMerger};
9use crate::wire::{VShardEnvelope, VShardMessageType};
10
11/// Wire message for spatial scatter request payload (zerompk).
12#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
13pub struct SpatialScatterPayload {
14    pub collection: String,
15    pub field: String,
16    pub predicate: String,
17    /// Typed query geometry, parsed and validated on the originating CP.
18    pub query_geometry: nodedb_types::geometry::Geometry,
19    pub distance_meters: f64,
20    pub limit: u32,
21}
22
23/// Scatter-gather coordinator for distributed spatial queries.
24pub struct SpatialScatterGather {
25    pub source_node: u64,
26    pub shard_ids: Vec<u32>,
27    merger: SpatialResultMerger,
28}
29
30impl SpatialScatterGather {
31    pub fn new(source_node: u64, shard_ids: Vec<u32>) -> Self {
32        let count = shard_ids.len();
33        Self {
34            source_node,
35            shard_ids,
36            merger: SpatialResultMerger::new(count),
37        }
38    }
39
40    /// Build scatter envelopes for a spatial query.
41    ///
42    /// Each envelope contains the query geometry + predicate type + distance
43    /// as JSON payload.
44    pub fn build_scatter_envelopes(
45        &self,
46        collection: &str,
47        field: &str,
48        predicate: &str,
49        query_geometry: nodedb_types::geometry::Geometry,
50        distance_meters: f64,
51        limit: usize,
52    ) -> Vec<(u32, VShardEnvelope)> {
53        let msg = SpatialScatterPayload {
54            collection: collection.to_string(),
55            field: field.to_string(),
56            predicate: predicate.to_string(),
57            query_geometry,
58            distance_meters,
59            limit: limit as u32,
60        };
61        let payload_bytes =
62            zerompk::to_msgpack_vec(&msg).expect("SpatialScatterPayload is always serializable");
63
64        self.shard_ids
65            .iter()
66            .map(|&shard_id| {
67                let env = VShardEnvelope::new(
68                    VShardMessageType::SpatialScatterRequest,
69                    self.source_node,
70                    0, // target_node resolved by routing table
71                    shard_id,
72                    payload_bytes.clone(),
73                );
74                (shard_id, env)
75            })
76            .collect()
77    }
78
79    /// Record a shard's response.
80    pub fn record_response(&mut self, result: &ShardSpatialResult) {
81        self.merger.add_shard_result(result);
82    }
83
84    /// Whether all shards have responded.
85    pub fn all_responded(&self) -> bool {
86        self.merger.all_responded()
87    }
88
89    /// Merge results from all shards.
90    pub fn merge_results(
91        &mut self,
92        limit: usize,
93        sort_by_distance: bool,
94    ) -> Vec<super::merge::SpatialHit> {
95        self.merger.merge(limit, sort_by_distance)
96    }
97
98    pub fn response_count(&self) -> usize {
99        self.merger.response_count()
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::super::merge::{ShardSpatialResult, SpatialHit};
106    use super::*;
107
108    #[test]
109    fn scatter_envelopes_built() {
110        let coord = SpatialScatterGather::new(1, vec![0, 1, 2]);
111        let envs = coord.build_scatter_envelopes(
112            "buildings",
113            "geom",
114            "st_dwithin",
115            nodedb_types::geometry::Geometry::point(0.0, 0.0),
116            1000.0,
117            100,
118        );
119        assert_eq!(envs.len(), 3);
120        for (shard_id, env) in &envs {
121            assert_eq!(env.msg_type, VShardMessageType::SpatialScatterRequest);
122            assert_eq!(env.vshard_id, *shard_id);
123        }
124    }
125
126    #[test]
127    fn collect_and_merge() {
128        let mut coord = SpatialScatterGather::new(1, vec![0, 1]);
129        coord.record_response(&ShardSpatialResult {
130            shard_id: 0,
131            hits: vec![SpatialHit {
132                doc_id: "a".into(),
133                shard_id: 0,
134                distance_meters: 200.0,
135            }],
136            success: true,
137            error: None,
138        });
139        coord.record_response(&ShardSpatialResult {
140            shard_id: 1,
141            hits: vec![SpatialHit {
142                doc_id: "b".into(),
143                shard_id: 1,
144                distance_meters: 50.0,
145            }],
146            success: true,
147            error: None,
148        });
149        assert!(coord.all_responded());
150
151        let results = coord.merge_results(10, true);
152        assert_eq!(results.len(), 2);
153        assert_eq!(results[0].doc_id, "b"); // nearer
154    }
155}