nodedb_cluster/distributed_array/coordinator/
read.rs1use std::sync::Arc;
10
11use crate::circuit_breaker::CircuitBreaker;
12use crate::error::{ClusterError, Result};
13
14use super::super::merge::{ArrayAggPartial, merge_slice_rows, reduce_agg_partials};
15use super::super::rpc::ShardRpcDispatch;
16use super::super::scatter::{FanOutParams, FanOutPartitionedParams, fan_out, fan_out_partitioned};
17use super::super::wire::{
18 ArrayShardAggReq, ArrayShardAggResp, ArrayShardDeleteReq, ArrayShardDeleteResp,
19 ArrayShardSliceReq, ArrayShardSliceResp, ArrayShardSurrogateBitmapReq,
20 ArrayShardSurrogateBitmapResp,
21};
22
23pub struct ArrayCoordParams {
25 pub source_node: u64,
26 pub shard_ids: Vec<u32>,
28 pub timeout_ms: u64,
30 pub prefix_bits: u8,
34 pub slice_hilbert_ranges: Vec<(u64, u64)>,
38}
39
40#[derive(Debug, Clone, Default)]
47pub struct CoordSliceResult {
48 pub rows: Vec<Vec<u8>>,
49 pub truncated_before_horizon: bool,
50}
51
52pub(super) fn shard_hilbert_range_for_vshard(shard_id: u32, prefix_bits: u8) -> (u64, u64) {
59 use crate::routing::VSHARD_COUNT;
60 let stride = (VSHARD_COUNT >> (prefix_bits as u32)).max(1);
61 let bucket = shard_id / stride;
62 let shift = 64u8.saturating_sub(prefix_bits);
63 let lo = (bucket as u64) << shift;
64 let hi = if shift == 0 {
65 u64::MAX
66 } else {
67 lo.saturating_add((1u64 << shift).saturating_sub(1))
68 };
69 (lo, hi)
70}
71
72pub struct ArrayCoordinator {
74 pub(super) params: ArrayCoordParams,
75 pub(super) dispatch: Arc<dyn ShardRpcDispatch>,
76 pub(super) circuit_breaker: Arc<CircuitBreaker>,
77}
78
79impl ArrayCoordinator {
80 pub fn new(
81 params: ArrayCoordParams,
82 dispatch: Arc<dyn ShardRpcDispatch>,
83 circuit_breaker: Arc<CircuitBreaker>,
84 ) -> Self {
85 Self {
86 params,
87 dispatch,
88 circuit_breaker,
89 }
90 }
91
92 pub fn for_slice(
100 source_node: u64,
101 timeout_ms: u64,
102 slice_hilbert_ranges: &[(u64, u64)],
103 prefix_bits: u8,
104 total_shards: u32,
105 dispatch: Arc<dyn ShardRpcDispatch>,
106 circuit_breaker: Arc<CircuitBreaker>,
107 ) -> crate::error::Result<Self> {
108 let shard_ids = super::super::routing::array_vshards_for_slice(
109 slice_hilbert_ranges,
110 prefix_bits,
111 total_shards,
112 )?;
113 Ok(Self {
114 params: ArrayCoordParams {
115 source_node,
116 shard_ids,
117 timeout_ms,
118 prefix_bits,
119 slice_hilbert_ranges: slice_hilbert_ranges.to_vec(),
120 },
121 dispatch,
122 circuit_breaker,
123 })
124 }
125
126 pub async fn coord_slice(
140 &self,
141 req: ArrayShardSliceReq,
142 coordinator_limit: u32,
143 ) -> Result<CoordSliceResult> {
144 let prefix_bits = self.params.prefix_bits;
145 let per_shard: Vec<(u32, Vec<u8>)> = self
146 .params
147 .shard_ids
148 .iter()
149 .map(|&shard_id| {
150 let shard_hilbert_range = if prefix_bits > 0 {
151 Some(shard_hilbert_range_for_vshard(shard_id, prefix_bits))
152 } else {
153 None
154 };
155 let per_shard_req = ArrayShardSliceReq {
156 prefix_bits,
157 slice_hilbert_ranges: self.params.slice_hilbert_ranges.clone(),
158 shard_hilbert_range,
159 ..req.clone()
160 };
161 let bytes =
162 zerompk::to_msgpack_vec(&per_shard_req).map_err(|e| ClusterError::Codec {
163 detail: format!("ArrayShardSliceReq serialise: {e}"),
164 })?;
165 Ok((shard_id, bytes))
166 })
167 .collect::<Result<Vec<_>>>()?;
168
169 let fo_params = FanOutPartitionedParams {
170 source_node: self.params.source_node,
171 timeout_ms: self.params.timeout_ms,
172 };
173 let raw = fan_out_partitioned(
174 &fo_params,
175 super::super::opcodes::ARRAY_SHARD_SLICE_REQ,
176 &per_shard,
177 &self.dispatch,
178 &self.circuit_breaker,
179 )
180 .await?;
181 let resps = decode_resps::<ArrayShardSliceResp>(&raw)?;
182 let truncated_before_horizon =
183 super::super::merge::any_truncated_before_horizon_slice(&resps);
184 let rows = merge_slice_rows(&resps, coordinator_limit);
185 Ok(CoordSliceResult {
186 rows,
187 truncated_before_horizon,
188 })
189 }
190
191 pub async fn coord_agg(&self, req: ArrayShardAggReq) -> Result<Vec<ArrayAggPartial>> {
198 let prefix_bits = self.params.prefix_bits;
199 let per_shard: Vec<(u32, Vec<u8>)> = self
200 .params
201 .shard_ids
202 .iter()
203 .map(|&shard_id| {
204 let hilbert_range = if prefix_bits > 0 {
205 Some(shard_hilbert_range_for_vshard(shard_id, prefix_bits))
206 } else {
207 None
208 };
209 let per_shard_req = ArrayShardAggReq {
210 shard_hilbert_range: hilbert_range,
211 ..req.clone()
212 };
213 let bytes =
214 zerompk::to_msgpack_vec(&per_shard_req).map_err(|e| ClusterError::Codec {
215 detail: format!("ArrayShardAggReq serialise: {e}"),
216 })?;
217 Ok((shard_id, bytes))
218 })
219 .collect::<Result<Vec<_>>>()?;
220
221 let fo_params = FanOutPartitionedParams {
222 source_node: self.params.source_node,
223 timeout_ms: self.params.timeout_ms,
224 };
225 let raw = fan_out_partitioned(
226 &fo_params,
227 super::super::opcodes::ARRAY_SHARD_AGG_REQ,
228 &per_shard,
229 &self.dispatch,
230 &self.circuit_breaker,
231 )
232 .await?;
233 let resps = decode_resps::<ArrayShardAggResp>(&raw)?;
234 Ok(reduce_agg_partials(&resps))
235 }
236
237 pub async fn coord_delete(
239 &self,
240 req: ArrayShardDeleteReq,
241 ) -> Result<Vec<ArrayShardDeleteResp>> {
242 let req_bytes = zerompk::to_msgpack_vec(&req).map_err(|e| ClusterError::Codec {
243 detail: format!("ArrayShardDeleteReq serialise: {e}"),
244 })?;
245 let raw = fan_out(
246 &self.fan_out_params(),
247 super::super::opcodes::ARRAY_SHARD_DELETE_REQ,
248 &req_bytes,
249 &self.dispatch,
250 &self.circuit_breaker,
251 )
252 .await?;
253 decode_resps::<ArrayShardDeleteResp>(&raw)
254 }
255
256 pub async fn coord_surrogate_bitmap_scan(
261 &self,
262 req: ArrayShardSurrogateBitmapReq,
263 ) -> Result<Vec<ArrayShardSurrogateBitmapResp>> {
264 let req_bytes = zerompk::to_msgpack_vec(&req).map_err(|e| ClusterError::Codec {
265 detail: format!("ArrayShardSurrogateBitmapReq serialise: {e}"),
266 })?;
267 let raw = fan_out(
268 &self.fan_out_params(),
269 super::super::opcodes::ARRAY_SHARD_SURROGATE_BITMAP_REQ,
270 &req_bytes,
271 &self.dispatch,
272 &self.circuit_breaker,
273 )
274 .await?;
275 decode_resps::<ArrayShardSurrogateBitmapResp>(&raw)
276 }
277
278 pub(super) fn fan_out_params(&self) -> FanOutParams {
279 FanOutParams {
280 shard_ids: self.params.shard_ids.clone(),
281 timeout_ms: self.params.timeout_ms,
282 source_node: self.params.source_node,
283 }
284 }
285}
286
287pub(super) fn decode_resps<T>(raw: &[(u32, Vec<u8>)]) -> Result<Vec<T>>
289where
290 T: for<'a> zerompk::FromMessagePack<'a>,
291{
292 raw.iter()
293 .map(|(_, bytes)| {
294 zerompk::from_msgpack(bytes).map_err(|e| ClusterError::Codec {
295 detail: format!("array response deserialise: {e}"),
296 })
297 })
298 .collect()
299}