Skip to main content

nodedb_cluster/distributed_vector/
seam.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Trait seam for distributed vector algorithms.
4//!
5//! `VectorShardSeam` defines the cross-shard exchange surface that advanced
6//! distributed vector search algorithms require. The current production
7//! implementation uses flat scatter-gather (`VectorScatterGather`), which
8//! corresponds to `select_shards` returning all shards and the other methods
9//! being no-ops. The trait reserves the shape so future algorithms can be
10//! added by implementing a concrete type without a protocol break.
11//!
12//! # Algorithm mapping
13//!
14//! ## Compass (coarse-code multi-stage routing)
15//!
16//! Compass performs a global coarse-code lookup before the fine search to
17//! select only the most relevant shards. The coordinator calls
18//! `select_shards` with the query to obtain a pruned shard subset (instead
19//! of fanning out to every shard), then issues a standard k-NN scatter
20//! against that subset. The wire messages `VectorCoarseRouteRequest` and
21//! `VectorCoarseRouteResponse` carry the per-shard coarse descriptor used
22//! by `select_shards` to make the pruning decision.
23//!
24//! Methods used: `select_shards`.
25//! Default behaviour: all shards selected (equivalent to current scatter-gather).
26//!
27//! ## SPIRE (Shared Per-shard Inverted Residual Encoding)
28//!
29//! SPIRE requires each shard to share its IVF centroid table with peers at
30//! index-build time so queries can leverage cross-shard centroid knowledge.
31//! This exchange is initiated shard-to-shard (not coordinator-mediated) and
32//! happens once per build, not per query. `build_time_exchange` is the hook
33//! for this; `direct_message` is the transport primitive it calls.
34//!
35//! Methods used: `build_time_exchange`, `direct_message`.
36//! Default behaviour: no-op build exchange; `direct_message` routes through
37//! the coordinator if a `ShardRpcDispatch` is available, otherwise returns
38//! `Err(VectorSeamError::DirectMessagingUnsupported)`.
39//!
40//! ## CoTra-RDMA (Cooperative Traversal over RDMA)
41//!
42//! CoTra-RDMA performs one-sided RDMA READs directly into a remote shard's
43//! HNSW graph without involving the remote CPU. The reading shard first calls
44//! `exposed_region` on the remote shard descriptor to obtain the registered
45//! memory region (address + rkey); it then issues one-sided reads at the
46//! hardware level. When RDMA is unavailable, `exposed_region` returns `None`
47//! and the algorithm falls back to standard scatter-gather.
48//!
49//! Methods used: `exposed_region`.
50//! Default behaviour: `None` — RDMA direct reads not supported by this impl.
51
52use crate::distributed_array::rpc::ShardRpcDispatch;
53use crate::error::{ClusterError, Result};
54use crate::wire::VShardEnvelope;
55
56/// Opaque payload exchanged between shards for algorithm-specific messages.
57///
58/// The contents are algorithm-defined and serialized with zerompk. The seam
59/// trait does not interpret the payload; algorithms define their own types
60/// and encode/decode them.
61#[derive(Debug, Clone)]
62pub struct ShardMessage {
63    /// Discriminant identifying which algorithm/phase produced this message.
64    pub kind: ShardMessageKind,
65    /// zerompk-encoded algorithm-specific payload.
66    pub payload: Vec<u8>,
67}
68
69/// Discriminant for `ShardMessage` payloads.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub enum ShardMessageKind {
72    /// Compass phase-1 coarse descriptor request/response.
73    CompassCoarseDescriptor,
74    /// SPIRE build-time IVF centroid table exchange.
75    SpireCentroidTable,
76}
77
78/// Reply to a `direct_message` call.
79#[derive(Debug, Clone)]
80pub struct ShardMessageReply {
81    /// zerompk-encoded reply payload. Empty for fire-and-forget messages.
82    pub payload: Vec<u8>,
83}
84
85/// Errors specific to `VectorShardSeam` operations.
86#[derive(Debug, thiserror::Error)]
87pub enum VectorSeamError {
88    #[error("direct shard-to-shard messaging is not supported by this implementation")]
89    DirectMessagingUnsupported,
90    #[error("build-time exchange failed for shard {peer_shard}: {detail}")]
91    BuildExchangeFailed { peer_shard: u32, detail: String },
92    #[error("cluster transport error during seam call: {0}")]
93    Transport(#[from] ClusterError),
94}
95
96/// A registered memory region exposed for one-sided (RDMA) reads.
97///
98/// When `exposed_region` returns `Some(MemoryRegion)`, the receiving side
99/// may read `len` bytes starting at `remote_addr` using RDMA with the
100/// provided `rkey`. No CPU involvement on the target shard is required.
101///
102/// When RDMA hardware or software support is absent, `exposed_region`
103/// returns `None` and callers must fall back to standard RPC-based access.
104#[derive(Debug, Clone, Copy)]
105pub struct MemoryRegion {
106    /// Remote virtual address of the pinned region.
107    pub remote_addr: u64,
108    /// RDMA remote key authorizing reads of this region.
109    pub rkey: u32,
110    /// Length of the region in bytes.
111    pub len: usize,
112}
113
114/// A reference to a peer shard used as a target for seam calls.
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
116pub struct ShardRef {
117    /// The peer node ID.
118    pub node_id: u64,
119    /// The vShard ID on that node.
120    pub vshard_id: u32,
121}
122
123/// Subset of shards selected for a query phase.
124///
125/// `All` means the caller should contact every shard in the collection —
126/// identical to current scatter-gather behaviour. `Subset` carries a pruned
127/// list; the coordinator fans out only to those shards.
128#[derive(Debug, Clone)]
129pub enum ShardSubset {
130    /// Contact all shards in the collection (default scatter-gather).
131    All,
132    /// Contact only this subset of shard IDs.
133    Subset(Vec<u32>),
134}
135
136impl ShardSubset {
137    /// Resolve the subset against a full shard list.
138    ///
139    /// Returns the shards to actually contact: either the full list (for
140    /// `All`) or the stored subset filtered to those present in `all_shards`.
141    pub fn resolve<'a>(&'a self, all_shards: &'a [u32]) -> &'a [u32] {
142        match self {
143            ShardSubset::All => all_shards,
144            ShardSubset::Subset(ids) => ids.as_slice(),
145        }
146    }
147}
148
149/// Cross-shard exchange seam for distributed vector search algorithms.
150///
151/// The default implementations preserve the current flat scatter-gather
152/// behaviour. Concrete types implementing advanced algorithms (Compass,
153/// SPIRE, CoTra-RDMA) override only the methods they need. Existing call
154/// sites that construct `VectorScatterGather` directly are unaffected.
155///
156/// # Plane safety
157///
158/// This trait is `Send + Sync + 'static` so it can be held by the Control
159/// Plane coordinator. Methods that touch the Data Plane must do so via the
160/// existing SPSC bridge — they must not perform storage I/O directly.
161pub trait VectorShardSeam: Send + Sync + 'static {
162    /// Select the subset of shards to contact for a query.
163    ///
164    /// Default: `ShardSubset::All` — equivalent to current scatter-gather.
165    ///
166    /// Compass overrides this to perform a coarse-code lookup against
167    /// previously obtained shard descriptors and return a pruned
168    /// `ShardSubset::Subset`. The coordinator calls this before building
169    /// scatter envelopes.
170    fn select_shards(&self, _query_vector: &[f32], _all_shards: &[u32]) -> ShardSubset {
171        ShardSubset::All
172    }
173
174    /// Build-time exchange with a peer shard.
175    ///
176    /// Default: no-op — returns `Ok(())` immediately.
177    ///
178    /// SPIRE overrides this to send the local shard's IVF centroid table to
179    /// `peer` via `direct_message` so the peer can build cross-shard centroid
180    /// knowledge. Called once per peer at index-build time, not per query.
181    ///
182    /// Implementors must not block the Tokio thread pool for heavy work;
183    /// offload encoding to `spawn_blocking` if the payload is large.
184    fn build_time_exchange(&self, _peer: ShardRef, _dispatch: &dyn ShardRpcDispatch) -> Result<()> {
185        Ok(())
186    }
187
188    /// Send a message directly to a peer shard and await its reply.
189    ///
190    /// Default: routes through `dispatch` (coordinator-mediated QUIC) if
191    /// provided, making it equivalent to a normal RPC call. Algorithms that
192    /// need genuinely shard-to-shard messaging (without coordinator
193    /// involvement) implement this method with a direct connection pool.
194    ///
195    /// When direct messaging is structurally impossible for the given
196    /// implementation, return
197    /// `Err(VectorSeamError::DirectMessagingUnsupported)`.
198    ///
199    /// # Wire mapping
200    ///
201    /// The `msg` payload is wrapped in a `VShardEnvelope` using the
202    /// appropriate `VShardMessageType` for `msg.kind` before dispatch.
203    fn direct_message(
204        &self,
205        peer: ShardRef,
206        msg: ShardMessage,
207        dispatch: &dyn ShardRpcDispatch,
208        source_node: u64,
209    ) -> impl std::future::Future<Output = std::result::Result<ShardMessageReply, VectorSeamError>> + Send
210    {
211        let envelope = build_message_envelope(source_node, peer, &msg);
212        async move {
213            let reply = dispatch
214                .call(envelope, 5_000)
215                .await
216                .map_err(VectorSeamError::Transport)?;
217            Ok(ShardMessageReply {
218                payload: reply.payload,
219            })
220        }
221    }
222
223    /// Return the memory region this shard exposes for one-sided reads, if any.
224    ///
225    /// Default: `None` — RDMA direct reads are not supported.
226    ///
227    /// CoTra-RDMA overrides this to return the registered pinned region for
228    /// the HNSW graph. The calling shard uses the returned `MemoryRegion` to
229    /// issue one-sided RDMA READs without involving this shard's CPU. When
230    /// `None` is returned, callers fall back to coordinator-mediated RPC.
231    fn exposed_region(&self) -> Option<MemoryRegion> {
232        None
233    }
234}
235
236/// Encode a `ShardMessage` into a `VShardEnvelope` for dispatch.
237fn build_message_envelope(source_node: u64, peer: ShardRef, msg: &ShardMessage) -> VShardEnvelope {
238    use crate::wire::{VShardEnvelope, VShardMessageType, WIRE_VERSION};
239    let msg_type = match msg.kind {
240        ShardMessageKind::CompassCoarseDescriptor => VShardMessageType::VectorCoarseRouteRequest,
241        ShardMessageKind::SpireCentroidTable => VShardMessageType::VectorBuildExchangeRequest,
242    };
243    VShardEnvelope {
244        version: WIRE_VERSION,
245        msg_type,
246        source_node,
247        target_node: peer.node_id,
248        vshard_id: peer.vshard_id,
249        payload: msg.payload.clone(),
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    /// A no-op seam using all default implementations.
258    struct DefaultSeam;
259    impl VectorShardSeam for DefaultSeam {}
260
261    #[test]
262    fn default_select_shards_returns_all() {
263        let seam = DefaultSeam;
264        let all = [0u32, 1, 2, 3];
265        let result = seam.select_shards(&[0.1, 0.2], &all);
266        assert!(matches!(result, ShardSubset::All));
267        assert_eq!(result.resolve(&all), &all);
268    }
269
270    #[test]
271    fn default_exposed_region_is_none() {
272        let seam = DefaultSeam;
273        assert!(seam.exposed_region().is_none());
274    }
275
276    #[test]
277    fn default_build_time_exchange_is_noop() {
278        struct MockDispatch;
279        #[async_trait::async_trait]
280        impl crate::distributed_array::rpc::ShardRpcDispatch for MockDispatch {
281            async fn call(
282                &self,
283                _req: VShardEnvelope,
284                _timeout_ms: u64,
285            ) -> crate::error::Result<VShardEnvelope> {
286                Err(crate::error::ClusterError::Transport {
287                    detail: "mock".into(),
288                })
289            }
290        }
291        let seam = DefaultSeam;
292        let peer = ShardRef {
293            node_id: 2,
294            vshard_id: 5,
295        };
296        let dispatch = MockDispatch;
297        // Default build_time_exchange is a no-op — never calls dispatch.
298        assert!(seam.build_time_exchange(peer, &dispatch).is_ok());
299    }
300
301    #[test]
302    fn shard_subset_resolve_subset() {
303        let all = [0u32, 1, 2, 3, 4];
304        let subset = ShardSubset::Subset(vec![1, 3]);
305        assert_eq!(subset.resolve(&all), &[1u32, 3]);
306    }
307
308    #[test]
309    fn shard_subset_resolve_all() {
310        let all = [0u32, 1, 2];
311        let subset = ShardSubset::All;
312        assert_eq!(subset.resolve(&all), &[0u32, 1, 2]);
313    }
314
315    #[test]
316    fn memory_region_fields() {
317        let region = MemoryRegion {
318            remote_addr: 0xDEAD_BEEF_0000_0000,
319            rkey: 42,
320            len: 1024 * 1024,
321        };
322        assert_eq!(region.rkey, 42);
323        assert_eq!(region.len, 1024 * 1024);
324    }
325
326    #[test]
327    fn build_message_envelope_compass() {
328        use crate::wire::VShardMessageType;
329        let peer = ShardRef {
330            node_id: 7,
331            vshard_id: 3,
332        };
333        let msg = ShardMessage {
334            kind: ShardMessageKind::CompassCoarseDescriptor,
335            payload: vec![1, 2, 3],
336        };
337        let env = build_message_envelope(1, peer, &msg);
338        assert_eq!(env.msg_type, VShardMessageType::VectorCoarseRouteRequest);
339        assert_eq!(env.target_node, 7);
340        assert_eq!(env.vshard_id, 3);
341        assert_eq!(env.payload, vec![1, 2, 3]);
342    }
343
344    #[test]
345    fn build_message_envelope_spire() {
346        use crate::wire::VShardMessageType;
347        let peer = ShardRef {
348            node_id: 9,
349            vshard_id: 1,
350        };
351        let msg = ShardMessage {
352            kind: ShardMessageKind::SpireCentroidTable,
353            payload: vec![0xFF, 0xAA],
354        };
355        let env = build_message_envelope(2, peer, &msg);
356        assert_eq!(env.msg_type, VShardMessageType::VectorBuildExchangeRequest);
357        assert_eq!(env.source_node, 2);
358    }
359}