Skip to main content

nodedb_cluster/distributed_array/coordinator/
read.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Read-path coordinator for distributed array operations.
4//!
5//! [`ArrayCoordinator`] drives fan-out reads (`coord_slice`, `coord_agg`,
6//! `coord_surrogate_bitmap_scan`) to the set of vShards whose Hilbert range
7//! overlaps the slice predicate.
8
9use std::sync::Arc;
10
11use crate::circuit_breaker::CircuitBreaker;
12use crate::error::{ClusterError, Result};
13
14use super::super::merge::{ArrayAggPartial, merge_slice_rows, reduce_agg_partials};
15use super::super::rpc::ShardRpcDispatch;
16use super::super::scatter::{FanOutParams, FanOutPartitionedParams, fan_out, fan_out_partitioned};
17use super::super::wire::{
18    ArrayShardAggReq, ArrayShardAggResp, ArrayShardDeleteReq, ArrayShardDeleteResp,
19    ArrayShardSliceReq, ArrayShardSliceResp, ArrayShardSurrogateBitmapReq,
20    ArrayShardSurrogateBitmapResp,
21};
22
23/// Parameters common to read-path coordinator entry points (broadcast fan-out).
24pub struct ArrayCoordParams {
25    pub source_node: u64,
26    /// Pre-computed target shard IDs (overlapping shards for reads).
27    pub shard_ids: Vec<u32>,
28    /// Per-shard RPC timeout in milliseconds.
29    pub timeout_ms: u64,
30    /// Hilbert routing granularity (1–16). 0 means no shard-side routing
31    /// validation (e.g. when the coordinator was constructed without slice
32    /// range information, as in tests or unbounded scans).
33    pub prefix_bits: u8,
34    /// Inclusive Hilbert-prefix ranges `(lo, hi)` that this read covers.
35    /// Forwarded to each shard so it can verify it still owns the range.
36    /// Empty means unbounded — the shard skips routing validation.
37    pub slice_hilbert_ranges: Vec<(u64, u64)>,
38}
39
40/// Result of a coordinated slice fan-out.
41///
42/// Carries the merged shard rows together with the OR-reduced
43/// `truncated_before_horizon` flag so the upstream caller can surface a
44/// below-horizon warning to the client. Mirrors the single-node
45/// `ArraySliceResponse` shape so downstream encoding is symmetric.
46#[derive(Debug, Clone, Default)]
47pub struct CoordSliceResult {
48    pub rows: Vec<Vec<u8>>,
49    pub truncated_before_horizon: bool,
50}
51
52/// Compute the inclusive Hilbert-prefix range `[lo, hi]` that vShard `shard_id`
53/// owns given the array's routing granularity `prefix_bits`.
54///
55/// Each bucket `b = shard_id / stride` covers the Hilbert range
56/// `[b << (64 - prefix_bits), ((b + 1) << (64 - prefix_bits)) - 1]`.
57/// The stride is `VSHARD_COUNT >> prefix_bits` (floored at 1).
58pub(super) fn shard_hilbert_range_for_vshard(shard_id: u32, prefix_bits: u8) -> (u64, u64) {
59    use crate::routing::VSHARD_COUNT;
60    let stride = (VSHARD_COUNT >> (prefix_bits as u32)).max(1);
61    let bucket = shard_id / stride;
62    let shift = 64u8.saturating_sub(prefix_bits);
63    let lo = (bucket as u64) << shift;
64    let hi = if shift == 0 {
65        u64::MAX
66    } else {
67        lo.saturating_add((1u64 << shift).saturating_sub(1))
68    };
69    (lo, hi)
70}
71
72/// Coordinator for distributed array read operations.
73pub struct ArrayCoordinator {
74    pub(super) params: ArrayCoordParams,
75    pub(super) dispatch: Arc<dyn ShardRpcDispatch>,
76    pub(super) circuit_breaker: Arc<CircuitBreaker>,
77}
78
79impl ArrayCoordinator {
80    pub fn new(
81        params: ArrayCoordParams,
82        dispatch: Arc<dyn ShardRpcDispatch>,
83        circuit_breaker: Arc<CircuitBreaker>,
84    ) -> Self {
85        Self {
86            params,
87            dispatch,
88            circuit_breaker,
89        }
90    }
91
92    /// Construct an `ArrayCoordinator` whose target shards are computed from
93    /// the Hilbert-prefix ranges that overlap a slice predicate.
94    ///
95    /// `slice_hilbert_ranges` — `(lo, hi)` pairs computed by the planner from
96    /// the `Slice` predicate. Pass an empty slice for an unbounded scan.
97    /// `prefix_bits` — the array's routing granularity from the catalog entry.
98    /// `total_shards` — the number of active vShards in the cluster.
99    pub fn for_slice(
100        source_node: u64,
101        timeout_ms: u64,
102        slice_hilbert_ranges: &[(u64, u64)],
103        prefix_bits: u8,
104        total_shards: u32,
105        dispatch: Arc<dyn ShardRpcDispatch>,
106        circuit_breaker: Arc<CircuitBreaker>,
107    ) -> crate::error::Result<Self> {
108        let shard_ids = super::super::routing::array_vshards_for_slice(
109            slice_hilbert_ranges,
110            prefix_bits,
111            total_shards,
112        )?;
113        Ok(Self {
114            params: ArrayCoordParams {
115                source_node,
116                shard_ids,
117                timeout_ms,
118                prefix_bits,
119                slice_hilbert_ranges: slice_hilbert_ranges.to_vec(),
120            },
121            dispatch,
122            circuit_breaker,
123        })
124    }
125
126    /// Fan out a coord-range slice to all target shards and merge the rows.
127    ///
128    /// Each shard receives the full slice request with the caller-supplied
129    /// `limit` pushed down so shards can stop scanning early. The coordinator
130    /// stamps a per-shard `shard_hilbert_range` so each shard only returns
131    /// cells whose Hilbert prefix falls within its owned range, preventing
132    /// duplicate rows in single-node harnesses where all vShards share one
133    /// Data Plane. The coordinator applies the same `limit` as a final
134    /// cut-off on the merged result.
135    ///
136    /// Returns merged rows plus the OR-reduced `truncated_before_horizon`
137    /// flag across all shards. If any shard fails the entire operation
138    /// returns `Err` — partial results are not silently dropped.
139    pub async fn coord_slice(
140        &self,
141        req: ArrayShardSliceReq,
142        coordinator_limit: u32,
143    ) -> Result<CoordSliceResult> {
144        let prefix_bits = self.params.prefix_bits;
145        let per_shard: Vec<(u32, Vec<u8>)> = self
146            .params
147            .shard_ids
148            .iter()
149            .map(|&shard_id| {
150                let shard_hilbert_range = if prefix_bits > 0 {
151                    Some(shard_hilbert_range_for_vshard(shard_id, prefix_bits))
152                } else {
153                    None
154                };
155                let per_shard_req = ArrayShardSliceReq {
156                    prefix_bits,
157                    slice_hilbert_ranges: self.params.slice_hilbert_ranges.clone(),
158                    shard_hilbert_range,
159                    ..req.clone()
160                };
161                let bytes =
162                    zerompk::to_msgpack_vec(&per_shard_req).map_err(|e| ClusterError::Codec {
163                        detail: format!("ArrayShardSliceReq serialise: {e}"),
164                    })?;
165                Ok((shard_id, bytes))
166            })
167            .collect::<Result<Vec<_>>>()?;
168
169        let fo_params = FanOutPartitionedParams {
170            source_node: self.params.source_node,
171            timeout_ms: self.params.timeout_ms,
172        };
173        let raw = fan_out_partitioned(
174            &fo_params,
175            super::super::opcodes::ARRAY_SHARD_SLICE_REQ,
176            &per_shard,
177            &self.dispatch,
178            &self.circuit_breaker,
179        )
180        .await?;
181        let resps = decode_resps::<ArrayShardSliceResp>(&raw)?;
182        let truncated_before_horizon =
183            super::super::merge::any_truncated_before_horizon_slice(&resps);
184        let rows = merge_slice_rows(&resps, coordinator_limit);
185        Ok(CoordSliceResult {
186            rows,
187            truncated_before_horizon,
188        })
189    }
190
191    /// Fan out an aggregate request and reduce partial aggregates from all shards.
192    ///
193    /// Each shard receives its own `shard_hilbert_range` so it can apply a
194    /// Hilbert-prefix pre-filter and only count cells in its partition. This
195    /// prevents double-counting in configurations where multiple vShards share
196    /// a single Data Plane executor (e.g. single-node harnesses).
197    pub async fn coord_agg(&self, req: ArrayShardAggReq) -> Result<Vec<ArrayAggPartial>> {
198        let prefix_bits = self.params.prefix_bits;
199        let per_shard: Vec<(u32, Vec<u8>)> = self
200            .params
201            .shard_ids
202            .iter()
203            .map(|&shard_id| {
204                let hilbert_range = if prefix_bits > 0 {
205                    Some(shard_hilbert_range_for_vshard(shard_id, prefix_bits))
206                } else {
207                    None
208                };
209                let per_shard_req = ArrayShardAggReq {
210                    shard_hilbert_range: hilbert_range,
211                    ..req.clone()
212                };
213                let bytes =
214                    zerompk::to_msgpack_vec(&per_shard_req).map_err(|e| ClusterError::Codec {
215                        detail: format!("ArrayShardAggReq serialise: {e}"),
216                    })?;
217                Ok((shard_id, bytes))
218            })
219            .collect::<Result<Vec<_>>>()?;
220
221        let fo_params = FanOutPartitionedParams {
222            source_node: self.params.source_node,
223            timeout_ms: self.params.timeout_ms,
224        };
225        let raw = fan_out_partitioned(
226            &fo_params,
227            super::super::opcodes::ARRAY_SHARD_AGG_REQ,
228            &per_shard,
229            &self.dispatch,
230            &self.circuit_breaker,
231        )
232        .await?;
233        let resps = decode_resps::<ArrayShardAggResp>(&raw)?;
234        Ok(reduce_agg_partials(&resps))
235    }
236
237    /// Forward a coord-based delete to the shard(s) that own the cells.
238    pub async fn coord_delete(
239        &self,
240        req: ArrayShardDeleteReq,
241    ) -> Result<Vec<ArrayShardDeleteResp>> {
242        let req_bytes = zerompk::to_msgpack_vec(&req).map_err(|e| ClusterError::Codec {
243            detail: format!("ArrayShardDeleteReq serialise: {e}"),
244        })?;
245        let raw = fan_out(
246            &self.fan_out_params(),
247            super::super::opcodes::ARRAY_SHARD_DELETE_REQ,
248            &req_bytes,
249            &self.dispatch,
250            &self.circuit_breaker,
251        )
252        .await?;
253        decode_resps::<ArrayShardDeleteResp>(&raw)
254    }
255
256    /// Fan out a surrogate bitmap scan, collect per-shard bitmap bytes, and
257    /// union all bitmaps on the coordinator.
258    ///
259    /// Returns the zerompk-encoded union `SurrogateBitmap` covering all shards.
260    pub async fn coord_surrogate_bitmap_scan(
261        &self,
262        req: ArrayShardSurrogateBitmapReq,
263    ) -> Result<Vec<ArrayShardSurrogateBitmapResp>> {
264        let req_bytes = zerompk::to_msgpack_vec(&req).map_err(|e| ClusterError::Codec {
265            detail: format!("ArrayShardSurrogateBitmapReq serialise: {e}"),
266        })?;
267        let raw = fan_out(
268            &self.fan_out_params(),
269            super::super::opcodes::ARRAY_SHARD_SURROGATE_BITMAP_REQ,
270            &req_bytes,
271            &self.dispatch,
272            &self.circuit_breaker,
273        )
274        .await?;
275        decode_resps::<ArrayShardSurrogateBitmapResp>(&raw)
276    }
277
278    pub(super) fn fan_out_params(&self) -> FanOutParams {
279        FanOutParams {
280            shard_ids: self.params.shard_ids.clone(),
281            timeout_ms: self.params.timeout_ms,
282            source_node: self.params.source_node,
283        }
284    }
285}
286
287/// Deserialise a slice of raw `(shard_id, bytes)` pairs into typed responses.
288pub(super) fn decode_resps<T>(raw: &[(u32, Vec<u8>)]) -> Result<Vec<T>>
289where
290    T: for<'a> zerompk::FromMessagePack<'a>,
291{
292    raw.iter()
293        .map(|(_, bytes)| {
294            zerompk::from_msgpack(bytes).map_err(|e| ClusterError::Codec {
295                detail: format!("array response deserialise: {e}"),
296            })
297        })
298        .collect()
299}