nodedb_cluster/distributed_spatial/
coordinator.rs1use super::merge::{ShardSpatialResult, SpatialResultMerger};
9use crate::wire::{VShardEnvelope, VShardMessageType};
10
11#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
13pub struct SpatialScatterPayload {
14 pub collection: String,
15 pub field: String,
16 pub predicate: String,
17 pub query_geometry: nodedb_types::geometry::Geometry,
19 pub distance_meters: f64,
20 pub limit: u32,
21}
22
23pub struct SpatialScatterGather {
25 pub source_node: u64,
26 pub shard_ids: Vec<u32>,
27 merger: SpatialResultMerger,
28}
29
30impl SpatialScatterGather {
31 pub fn new(source_node: u64, shard_ids: Vec<u32>) -> Self {
32 let count = shard_ids.len();
33 Self {
34 source_node,
35 shard_ids,
36 merger: SpatialResultMerger::new(count),
37 }
38 }
39
40 pub fn build_scatter_envelopes(
45 &self,
46 collection: &str,
47 field: &str,
48 predicate: &str,
49 query_geometry: nodedb_types::geometry::Geometry,
50 distance_meters: f64,
51 limit: usize,
52 ) -> Vec<(u32, VShardEnvelope)> {
53 let msg = SpatialScatterPayload {
54 collection: collection.to_string(),
55 field: field.to_string(),
56 predicate: predicate.to_string(),
57 query_geometry,
58 distance_meters,
59 limit: limit as u32,
60 };
61 let payload_bytes =
62 zerompk::to_msgpack_vec(&msg).expect("SpatialScatterPayload is always serializable");
63
64 self.shard_ids
65 .iter()
66 .map(|&shard_id| {
67 let env = VShardEnvelope::new(
68 VShardMessageType::SpatialScatterRequest,
69 self.source_node,
70 0, shard_id,
72 payload_bytes.clone(),
73 );
74 (shard_id, env)
75 })
76 .collect()
77 }
78
79 pub fn record_response(&mut self, result: &ShardSpatialResult) {
81 self.merger.add_shard_result(result);
82 }
83
84 pub fn all_responded(&self) -> bool {
86 self.merger.all_responded()
87 }
88
89 pub fn merge_results(
91 &mut self,
92 limit: usize,
93 sort_by_distance: bool,
94 ) -> Vec<super::merge::SpatialHit> {
95 self.merger.merge(limit, sort_by_distance)
96 }
97
98 pub fn response_count(&self) -> usize {
99 self.merger.response_count()
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::super::merge::{ShardSpatialResult, SpatialHit};
106 use super::*;
107
108 #[test]
109 fn scatter_envelopes_built() {
110 let coord = SpatialScatterGather::new(1, vec![0, 1, 2]);
111 let envs = coord.build_scatter_envelopes(
112 "buildings",
113 "geom",
114 "st_dwithin",
115 nodedb_types::geometry::Geometry::point(0.0, 0.0),
116 1000.0,
117 100,
118 );
119 assert_eq!(envs.len(), 3);
120 for (shard_id, env) in &envs {
121 assert_eq!(env.msg_type, VShardMessageType::SpatialScatterRequest);
122 assert_eq!(env.vshard_id, *shard_id);
123 }
124 }
125
126 #[test]
127 fn collect_and_merge() {
128 let mut coord = SpatialScatterGather::new(1, vec![0, 1]);
129 coord.record_response(&ShardSpatialResult {
130 shard_id: 0,
131 hits: vec![SpatialHit {
132 doc_id: "a".into(),
133 shard_id: 0,
134 distance_meters: 200.0,
135 }],
136 success: true,
137 error: None,
138 });
139 coord.record_response(&ShardSpatialResult {
140 shard_id: 1,
141 hits: vec![SpatialHit {
142 doc_id: "b".into(),
143 shard_id: 1,
144 distance_meters: 50.0,
145 }],
146 success: true,
147 error: None,
148 });
149 assert!(coord.all_responded());
150
151 let results = coord.merge_results(10, true);
152 assert_eq!(results.len(), 2);
153 assert_eq!(results[0].doc_id, "b"); }
155}