nodedb_cluster/distributed_vector/seam.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Trait seam for distributed vector algorithms.
4//!
5//! `VectorShardSeam` defines the cross-shard exchange surface that advanced
6//! distributed vector search algorithms require. The current production
7//! implementation uses flat scatter-gather (`VectorScatterGather`), which
8//! corresponds to `select_shards` returning all shards and the other methods
9//! being no-ops. The trait reserves the shape so future algorithms can be
10//! added by implementing a concrete type without a protocol break.
11//!
12//! # Algorithm mapping
13//!
14//! ## Compass (coarse-code multi-stage routing)
15//!
16//! Compass performs a global coarse-code lookup before the fine search to
17//! select only the most relevant shards. The coordinator calls
18//! `select_shards` with the query to obtain a pruned shard subset (instead
19//! of fanning out to every shard), then issues a standard k-NN scatter
20//! against that subset. The wire messages `VectorCoarseRouteRequest` and
21//! `VectorCoarseRouteResponse` carry the per-shard coarse descriptor used
22//! by `select_shards` to make the pruning decision.
23//!
24//! Methods used: `select_shards`.
25//! Default behaviour: all shards selected (equivalent to current scatter-gather).
26//!
27//! ## SPIRE (Shared Per-shard Inverted Residual Encoding)
28//!
29//! SPIRE requires each shard to share its IVF centroid table with peers at
30//! index-build time so queries can leverage cross-shard centroid knowledge.
31//! This exchange is initiated shard-to-shard (not coordinator-mediated) and
32//! happens once per build, not per query. `build_time_exchange` is the hook
33//! for this; `direct_message` is the transport primitive it calls.
34//!
35//! Methods used: `build_time_exchange`, `direct_message`.
36//! Default behaviour: no-op build exchange; `direct_message` routes through
37//! the coordinator if a `ShardRpcDispatch` is available, otherwise returns
38//! `Err(VectorSeamError::DirectMessagingUnsupported)`.
39//!
40//! ## CoTra-RDMA (Cooperative Traversal over RDMA)
41//!
42//! CoTra-RDMA performs one-sided RDMA READs directly into a remote shard's
43//! HNSW graph without involving the remote CPU. The reading shard first calls
44//! `exposed_region` on the remote shard descriptor to obtain the registered
45//! memory region (address + rkey); it then issues one-sided reads at the
46//! hardware level. When RDMA is unavailable, `exposed_region` returns `None`
47//! and the algorithm falls back to standard scatter-gather.
48//!
49//! Methods used: `exposed_region`.
50//! Default behaviour: `None` — RDMA direct reads not supported by this impl.
51
52use crate::distributed_array::rpc::ShardRpcDispatch;
53use crate::error::{ClusterError, Result};
54use crate::wire::VShardEnvelope;
55
56/// Opaque payload exchanged between shards for algorithm-specific messages.
57///
58/// The contents are algorithm-defined and serialized with zerompk. The seam
59/// trait does not interpret the payload; algorithms define their own types
60/// and encode/decode them.
61#[derive(Debug, Clone)]
62pub struct ShardMessage {
63 /// Discriminant identifying which algorithm/phase produced this message.
64 pub kind: ShardMessageKind,
65 /// zerompk-encoded algorithm-specific payload.
66 pub payload: Vec<u8>,
67}
68
69/// Discriminant for `ShardMessage` payloads.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub enum ShardMessageKind {
72 /// Compass phase-1 coarse descriptor request/response.
73 CompassCoarseDescriptor,
74 /// SPIRE build-time IVF centroid table exchange.
75 SpireCentroidTable,
76}
77
78/// Reply to a `direct_message` call.
79#[derive(Debug, Clone)]
80pub struct ShardMessageReply {
81 /// zerompk-encoded reply payload. Empty for fire-and-forget messages.
82 pub payload: Vec<u8>,
83}
84
85/// Errors specific to `VectorShardSeam` operations.
86#[derive(Debug, thiserror::Error)]
87pub enum VectorSeamError {
88 #[error("direct shard-to-shard messaging is not supported by this implementation")]
89 DirectMessagingUnsupported,
90 #[error("build-time exchange failed for shard {peer_shard}: {detail}")]
91 BuildExchangeFailed { peer_shard: u32, detail: String },
92 #[error("cluster transport error during seam call: {0}")]
93 Transport(#[from] ClusterError),
94}
95
96/// A registered memory region exposed for one-sided (RDMA) reads.
97///
98/// When `exposed_region` returns `Some(MemoryRegion)`, the receiving side
99/// may read `len` bytes starting at `remote_addr` using RDMA with the
100/// provided `rkey`. No CPU involvement on the target shard is required.
101///
102/// When RDMA hardware or software support is absent, `exposed_region`
103/// returns `None` and callers must fall back to standard RPC-based access.
104#[derive(Debug, Clone, Copy)]
105pub struct MemoryRegion {
106 /// Remote virtual address of the pinned region.
107 pub remote_addr: u64,
108 /// RDMA remote key authorizing reads of this region.
109 pub rkey: u32,
110 /// Length of the region in bytes.
111 pub len: usize,
112}
113
114/// A reference to a peer shard used as a target for seam calls.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub struct ShardRef {
117 /// The peer node ID.
118 pub node_id: u64,
119 /// The vShard ID on that node.
120 pub vshard_id: u32,
121}
122
123/// Subset of shards selected for a query phase.
124///
125/// `All` means the caller should contact every shard in the collection —
126/// identical to current scatter-gather behaviour. `Subset` carries a pruned
127/// list; the coordinator fans out only to those shards.
128#[derive(Debug, Clone)]
129pub enum ShardSubset {
130 /// Contact all shards in the collection (default scatter-gather).
131 All,
132 /// Contact only this subset of shard IDs.
133 Subset(Vec<u32>),
134}
135
136impl ShardSubset {
137 /// Resolve the subset against a full shard list.
138 ///
139 /// Returns the shards to actually contact: either the full list (for
140 /// `All`) or the stored subset filtered to those present in `all_shards`.
141 pub fn resolve<'a>(&'a self, all_shards: &'a [u32]) -> &'a [u32] {
142 match self {
143 ShardSubset::All => all_shards,
144 ShardSubset::Subset(ids) => ids.as_slice(),
145 }
146 }
147}
148
149/// Cross-shard exchange seam for distributed vector search algorithms.
150///
151/// The default implementations preserve the current flat scatter-gather
152/// behaviour. Concrete types implementing advanced algorithms (Compass,
153/// SPIRE, CoTra-RDMA) override only the methods they need. Existing call
154/// sites that construct `VectorScatterGather` directly are unaffected.
155///
156/// # Plane safety
157///
158/// This trait is `Send + Sync + 'static` so it can be held by the Control
159/// Plane coordinator. Methods that touch the Data Plane must do so via the
160/// existing SPSC bridge — they must not perform storage I/O directly.
161pub trait VectorShardSeam: Send + Sync + 'static {
162 /// Select the subset of shards to contact for a query.
163 ///
164 /// Default: `ShardSubset::All` — equivalent to current scatter-gather.
165 ///
166 /// Compass overrides this to perform a coarse-code lookup against
167 /// previously obtained shard descriptors and return a pruned
168 /// `ShardSubset::Subset`. The coordinator calls this before building
169 /// scatter envelopes.
170 fn select_shards(&self, _query_vector: &[f32], _all_shards: &[u32]) -> ShardSubset {
171 ShardSubset::All
172 }
173
174 /// Build-time exchange with a peer shard.
175 ///
176 /// Default: no-op — returns `Ok(())` immediately.
177 ///
178 /// SPIRE overrides this to send the local shard's IVF centroid table to
179 /// `peer` via `direct_message` so the peer can build cross-shard centroid
180 /// knowledge. Called once per peer at index-build time, not per query.
181 ///
182 /// Implementors must not block the Tokio thread pool for heavy work;
183 /// offload encoding to `spawn_blocking` if the payload is large.
184 fn build_time_exchange(&self, _peer: ShardRef, _dispatch: &dyn ShardRpcDispatch) -> Result<()> {
185 Ok(())
186 }
187
188 /// Send a message directly to a peer shard and await its reply.
189 ///
190 /// Default: routes through `dispatch` (coordinator-mediated QUIC) if
191 /// provided, making it equivalent to a normal RPC call. Algorithms that
192 /// need genuinely shard-to-shard messaging (without coordinator
193 /// involvement) implement this method with a direct connection pool.
194 ///
195 /// When direct messaging is structurally impossible for the given
196 /// implementation, return
197 /// `Err(VectorSeamError::DirectMessagingUnsupported)`.
198 ///
199 /// # Wire mapping
200 ///
201 /// The `msg` payload is wrapped in a `VShardEnvelope` using the
202 /// appropriate `VShardMessageType` for `msg.kind` before dispatch.
203 fn direct_message(
204 &self,
205 peer: ShardRef,
206 msg: ShardMessage,
207 dispatch: &dyn ShardRpcDispatch,
208 source_node: u64,
209 ) -> impl std::future::Future<Output = std::result::Result<ShardMessageReply, VectorSeamError>> + Send
210 {
211 let envelope = build_message_envelope(source_node, peer, &msg);
212 async move {
213 let reply = dispatch
214 .call(envelope, 5_000)
215 .await
216 .map_err(VectorSeamError::Transport)?;
217 Ok(ShardMessageReply {
218 payload: reply.payload,
219 })
220 }
221 }
222
223 /// Return the memory region this shard exposes for one-sided reads, if any.
224 ///
225 /// Default: `None` — RDMA direct reads are not supported.
226 ///
227 /// CoTra-RDMA overrides this to return the registered pinned region for
228 /// the HNSW graph. The calling shard uses the returned `MemoryRegion` to
229 /// issue one-sided RDMA READs without involving this shard's CPU. When
230 /// `None` is returned, callers fall back to coordinator-mediated RPC.
231 fn exposed_region(&self) -> Option<MemoryRegion> {
232 None
233 }
234}
235
236/// Encode a `ShardMessage` into a `VShardEnvelope` for dispatch.
237fn build_message_envelope(source_node: u64, peer: ShardRef, msg: &ShardMessage) -> VShardEnvelope {
238 use crate::wire::{VShardEnvelope, VShardMessageType, WIRE_VERSION};
239 let msg_type = match msg.kind {
240 ShardMessageKind::CompassCoarseDescriptor => VShardMessageType::VectorCoarseRouteRequest,
241 ShardMessageKind::SpireCentroidTable => VShardMessageType::VectorBuildExchangeRequest,
242 };
243 VShardEnvelope {
244 version: WIRE_VERSION,
245 msg_type,
246 source_node,
247 target_node: peer.node_id,
248 vshard_id: peer.vshard_id,
249 payload: msg.payload.clone(),
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 /// A no-op seam using all default implementations.
258 struct DefaultSeam;
259 impl VectorShardSeam for DefaultSeam {}
260
261 #[test]
262 fn default_select_shards_returns_all() {
263 let seam = DefaultSeam;
264 let all = [0u32, 1, 2, 3];
265 let result = seam.select_shards(&[0.1, 0.2], &all);
266 assert!(matches!(result, ShardSubset::All));
267 assert_eq!(result.resolve(&all), &all);
268 }
269
270 #[test]
271 fn default_exposed_region_is_none() {
272 let seam = DefaultSeam;
273 assert!(seam.exposed_region().is_none());
274 }
275
276 #[test]
277 fn default_build_time_exchange_is_noop() {
278 struct MockDispatch;
279 #[async_trait::async_trait]
280 impl crate::distributed_array::rpc::ShardRpcDispatch for MockDispatch {
281 async fn call(
282 &self,
283 _req: VShardEnvelope,
284 _timeout_ms: u64,
285 ) -> crate::error::Result<VShardEnvelope> {
286 Err(crate::error::ClusterError::Transport {
287 detail: "mock".into(),
288 })
289 }
290 }
291 let seam = DefaultSeam;
292 let peer = ShardRef {
293 node_id: 2,
294 vshard_id: 5,
295 };
296 let dispatch = MockDispatch;
297 // Default build_time_exchange is a no-op — never calls dispatch.
298 assert!(seam.build_time_exchange(peer, &dispatch).is_ok());
299 }
300
301 #[test]
302 fn shard_subset_resolve_subset() {
303 let all = [0u32, 1, 2, 3, 4];
304 let subset = ShardSubset::Subset(vec![1, 3]);
305 assert_eq!(subset.resolve(&all), &[1u32, 3]);
306 }
307
308 #[test]
309 fn shard_subset_resolve_all() {
310 let all = [0u32, 1, 2];
311 let subset = ShardSubset::All;
312 assert_eq!(subset.resolve(&all), &[0u32, 1, 2]);
313 }
314
315 #[test]
316 fn memory_region_fields() {
317 let region = MemoryRegion {
318 remote_addr: 0xDEAD_BEEF_0000_0000,
319 rkey: 42,
320 len: 1024 * 1024,
321 };
322 assert_eq!(region.rkey, 42);
323 assert_eq!(region.len, 1024 * 1024);
324 }
325
326 #[test]
327 fn build_message_envelope_compass() {
328 use crate::wire::VShardMessageType;
329 let peer = ShardRef {
330 node_id: 7,
331 vshard_id: 3,
332 };
333 let msg = ShardMessage {
334 kind: ShardMessageKind::CompassCoarseDescriptor,
335 payload: vec![1, 2, 3],
336 };
337 let env = build_message_envelope(1, peer, &msg);
338 assert_eq!(env.msg_type, VShardMessageType::VectorCoarseRouteRequest);
339 assert_eq!(env.target_node, 7);
340 assert_eq!(env.vshard_id, 3);
341 assert_eq!(env.payload, vec![1, 2, 3]);
342 }
343
344 #[test]
345 fn build_message_envelope_spire() {
346 use crate::wire::VShardMessageType;
347 let peer = ShardRef {
348 node_id: 9,
349 vshard_id: 1,
350 };
351 let msg = ShardMessage {
352 kind: ShardMessageKind::SpireCentroidTable,
353 payload: vec![0xFF, 0xAA],
354 };
355 let env = build_message_envelope(2, peer, &msg);
356 assert_eq!(env.msg_type, VShardMessageType::VectorBuildExchangeRequest);
357 assert_eq!(env.source_node, 2);
358 }
359}