Skip to main content

nodedb_cluster/
vshard_handler.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Shard-side handler for incoming VShardEnvelope messages.
4//!
5//! When a remote node sends a VShardEnvelope via QUIC (wrapped in
6//! `RaftRpc::VShardEnvelope`), the transport server routes it here.
7//! This handler dispatches based on `VShardMessageType` to the
8//! appropriate engine handler:
9//!
10//! - Graph BSP messages → graph algorithm shard handler
11//! - Timeseries scatter → local scan + partial aggregate response
12//! - Retention command → local retention enforcement
13//! - S3 archive command → local archive execution
14
15use crate::wire::{VShardEnvelope, VShardMessageType};
16
17/// Result of handling a VShardEnvelope.
18pub enum HandleResult {
19    /// Send a response envelope back to the coordinator.
20    Response(VShardEnvelope),
21    /// No response needed (fire-and-forget message).
22    NoResponse,
23    /// Handler error.
24    Error(String),
25}
26
27/// Trait for handling incoming VShardEnvelope messages on a shard.
28///
29/// Implemented by the main binary, which has access to the Data Plane
30/// engines (timeseries scan, graph traversal, etc.).
31pub trait VShardHandler: Send + Sync + 'static {
32    /// Handle an incoming VShardEnvelope and optionally produce a response.
33    fn handle_vshard_envelope(
34        &self,
35        envelope: VShardEnvelope,
36    ) -> impl std::future::Future<Output = HandleResult> + Send;
37}
38
39/// Default handler that dispatches based on message type.
40///
41/// The main binary implements `VShardHandler` and uses this function
42/// as the dispatch core, delegating to engine-specific handlers.
43pub fn dispatch_by_type(envelope: &VShardEnvelope) -> DispatchTarget {
44    match envelope.msg_type {
45        // Graph BSP
46        VShardMessageType::GraphAlgoSuperstep => DispatchTarget::GraphBsp,
47        VShardMessageType::GraphAlgoContributions => DispatchTarget::GraphBsp,
48        VShardMessageType::GraphAlgoSuperstepAck => DispatchTarget::GraphBsp,
49        VShardMessageType::GraphAlgoComplete => DispatchTarget::GraphBsp,
50
51        // Timeseries distributed
52        VShardMessageType::TsScatterRequest => DispatchTarget::TimeseriesScan,
53        VShardMessageType::TsScatterResponse => DispatchTarget::TimeseriesCoordinator,
54        VShardMessageType::TsRetentionCommand => DispatchTarget::TimeseriesRetention,
55        VShardMessageType::TsRetentionAck => DispatchTarget::TimeseriesCoordinator,
56        VShardMessageType::TsArchiveCommand => DispatchTarget::TimeseriesArchive,
57        VShardMessageType::TsArchiveAck => DispatchTarget::TimeseriesCoordinator,
58
59        // Migration / infrastructure
60        VShardMessageType::SegmentChunk
61        | VShardMessageType::SegmentComplete
62        | VShardMessageType::WalTail
63        | VShardMessageType::RoutingUpdate
64        | VShardMessageType::RoutingAck => DispatchTarget::Migration,
65
66        VShardMessageType::GhostCreate
67        | VShardMessageType::GhostDelete
68        | VShardMessageType::GhostVerifyRequest
69        | VShardMessageType::GhostVerifyResponse => DispatchTarget::Ghost,
70
71        VShardMessageType::MigrationBaseCopy => DispatchTarget::Migration,
72        VShardMessageType::GsiForward => DispatchTarget::Forward,
73        VShardMessageType::EdgeValidation => DispatchTarget::GraphValidation,
74
75        // Vector distributed search
76        VShardMessageType::VectorScatterRequest => DispatchTarget::VectorSearch,
77        VShardMessageType::VectorScatterResponse => DispatchTarget::VectorCoordinator,
78
79        // Compass coarse-routing phase
80        VShardMessageType::VectorCoarseRouteRequest => DispatchTarget::VectorCoarseRoute,
81        VShardMessageType::VectorCoarseRouteResponse => DispatchTarget::VectorCoordinator,
82
83        // SPIRE build-time centroid exchange
84        VShardMessageType::VectorBuildExchangeRequest => DispatchTarget::VectorBuildExchange,
85        VShardMessageType::VectorBuildExchangeResponse => DispatchTarget::VectorBuildExchange,
86
87        // CoTra-RDMA memory region registration
88        VShardMessageType::VectorMemRegionRequest => DispatchTarget::VectorMemRegion,
89        VShardMessageType::VectorMemRegionResponse => DispatchTarget::VectorMemRegion,
90
91        // Spatial distributed queries
92        VShardMessageType::SpatialScatterRequest => DispatchTarget::SpatialSearch,
93        VShardMessageType::SpatialScatterResponse => DispatchTarget::SpatialCoordinator,
94
95        // Event Plane cross-shard delivery
96        VShardMessageType::CrossShardEvent => DispatchTarget::EventPlane,
97        VShardMessageType::CrossShardEventAck => DispatchTarget::EventPlane,
98        VShardMessageType::NotifyBroadcast => DispatchTarget::EventPlane,
99        VShardMessageType::NotifyBroadcastAck => DispatchTarget::EventPlane,
100
101        // Distributed array (Hilbert-sharded sparse arrays)
102        VShardMessageType::ArrayShardSliceReq => DispatchTarget::ArrayShard,
103        VShardMessageType::ArrayShardSliceResp => DispatchTarget::ArrayCoordinator,
104        VShardMessageType::ArrayShardAggReq => DispatchTarget::ArrayShard,
105        VShardMessageType::ArrayShardAggResp => DispatchTarget::ArrayCoordinator,
106        VShardMessageType::ArrayShardPutReq => DispatchTarget::ArrayShard,
107        VShardMessageType::ArrayShardPutResp => DispatchTarget::ArrayCoordinator,
108        VShardMessageType::ArrayShardDeleteReq => DispatchTarget::ArrayShard,
109        VShardMessageType::ArrayShardDeleteResp => DispatchTarget::ArrayCoordinator,
110        VShardMessageType::ArrayShardSurrogateBitmapReq => DispatchTarget::ArrayShard,
111        VShardMessageType::ArrayShardSurrogateBitmapResp => DispatchTarget::ArrayCoordinator,
112    }
113}
114
115/// Which engine subsystem should handle this envelope.
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum DispatchTarget {
118    /// Graph BSP algorithm handler.
119    GraphBsp,
120    /// Graph edge validation.
121    GraphValidation,
122    /// Timeseries local scan (shard executes scan, returns partials).
123    TimeseriesScan,
124    /// Timeseries coordinator (receives shard responses).
125    TimeseriesCoordinator,
126    /// Timeseries retention enforcement on local shard.
127    TimeseriesRetention,
128    /// Timeseries S3 archive execution on local shard.
129    TimeseriesArchive,
130    /// Vector local k-NN search (shard executes search, returns top-K hits).
131    VectorSearch,
132    /// Vector coordinator (receives shard search responses).
133    VectorCoordinator,
134    /// Compass coarse-route handler: shard returns its coarse routing descriptor.
135    VectorCoarseRoute,
136    /// SPIRE build-time exchange: shard sends or receives IVF centroid tables.
137    VectorBuildExchange,
138    /// CoTra-RDMA memory region handler: shard registers or exposes a pinned
139    /// memory region for one-sided reads by a remote peer.
140    VectorMemRegion,
141    /// Migration infrastructure.
142    Migration,
143    /// Ghost stub management.
144    Ghost,
145    /// Query/transaction forwarding.
146    Forward,
147    /// Spatial local R-tree search (shard executes predicate, returns matching docs).
148    SpatialSearch,
149    /// Spatial coordinator (receives shard search responses).
150    SpatialCoordinator,
151    /// Event Plane cross-shard delivery (trigger DML, CDC events).
152    EventPlane,
153    /// Array shard: executes slice/agg/put/delete/surrogate-scan locally.
154    ArrayShard,
155    /// Array coordinator: receives shard responses and merges them.
156    ArrayCoordinator,
157}
158
159/// Build a response envelope for a timeseries scatter response.
160pub fn build_ts_scatter_response(
161    source_node: u64,
162    target_node: u64,
163    vshard_id: u32,
164    partials_json: &[u8],
165) -> VShardEnvelope {
166    VShardEnvelope::new(
167        VShardMessageType::TsScatterResponse,
168        source_node,
169        target_node,
170        vshard_id,
171        partials_json.to_vec(),
172    )
173}
174
175/// Build a response envelope for a retention acknowledgement.
176pub fn build_ts_retention_ack(
177    source_node: u64,
178    target_node: u64,
179    vshard_id: u32,
180    result_json: &[u8],
181) -> VShardEnvelope {
182    VShardEnvelope::new(
183        VShardMessageType::TsRetentionAck,
184        source_node,
185        target_node,
186        vshard_id,
187        result_json.to_vec(),
188    )
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn dispatch_graph_bsp() {
197        let env = VShardEnvelope::new(VShardMessageType::GraphAlgoSuperstep, 1, 2, 42, vec![]);
198        assert_eq!(dispatch_by_type(&env), DispatchTarget::GraphBsp);
199    }
200
201    #[test]
202    fn dispatch_ts_scatter() {
203        let env = VShardEnvelope::new(VShardMessageType::TsScatterRequest, 1, 2, 42, vec![]);
204        assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesScan);
205    }
206
207    #[test]
208    fn dispatch_ts_retention() {
209        let env = VShardEnvelope::new(VShardMessageType::TsRetentionCommand, 1, 2, 42, vec![]);
210        assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesRetention);
211    }
212
213    #[test]
214    fn dispatch_migration() {
215        let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, vec![]);
216        assert_eq!(dispatch_by_type(&env), DispatchTarget::Migration);
217    }
218
219    #[test]
220    fn all_message_types_dispatched() {
221        // Every VShardMessageType must map to a DispatchTarget —
222        // this test ensures no new types are added without a handler.
223        let all_types = [
224            VShardMessageType::SegmentChunk,
225            VShardMessageType::SegmentComplete,
226            VShardMessageType::WalTail,
227            VShardMessageType::RoutingUpdate,
228            VShardMessageType::RoutingAck,
229            VShardMessageType::GhostCreate,
230            VShardMessageType::GhostDelete,
231            VShardMessageType::GhostVerifyRequest,
232            VShardMessageType::GhostVerifyResponse,
233            VShardMessageType::MigrationBaseCopy,
234            VShardMessageType::GsiForward,
235            VShardMessageType::EdgeValidation,
236            VShardMessageType::GraphAlgoSuperstep,
237            VShardMessageType::GraphAlgoContributions,
238            VShardMessageType::GraphAlgoSuperstepAck,
239            VShardMessageType::GraphAlgoComplete,
240            VShardMessageType::TsScatterRequest,
241            VShardMessageType::TsScatterResponse,
242            VShardMessageType::TsRetentionCommand,
243            VShardMessageType::TsRetentionAck,
244            VShardMessageType::TsArchiveCommand,
245            VShardMessageType::TsArchiveAck,
246            VShardMessageType::VectorScatterRequest,
247            VShardMessageType::VectorScatterResponse,
248            VShardMessageType::VectorCoarseRouteRequest,
249            VShardMessageType::VectorCoarseRouteResponse,
250            VShardMessageType::VectorBuildExchangeRequest,
251            VShardMessageType::VectorBuildExchangeResponse,
252            VShardMessageType::VectorMemRegionRequest,
253            VShardMessageType::VectorMemRegionResponse,
254            VShardMessageType::SpatialScatterRequest,
255            VShardMessageType::SpatialScatterResponse,
256            VShardMessageType::CrossShardEvent,
257            VShardMessageType::CrossShardEventAck,
258            VShardMessageType::NotifyBroadcast,
259            VShardMessageType::NotifyBroadcastAck,
260            VShardMessageType::ArrayShardSliceReq,
261            VShardMessageType::ArrayShardSliceResp,
262            VShardMessageType::ArrayShardAggReq,
263            VShardMessageType::ArrayShardAggResp,
264            VShardMessageType::ArrayShardPutReq,
265            VShardMessageType::ArrayShardPutResp,
266            VShardMessageType::ArrayShardDeleteReq,
267            VShardMessageType::ArrayShardDeleteResp,
268            VShardMessageType::ArrayShardSurrogateBitmapReq,
269            VShardMessageType::ArrayShardSurrogateBitmapResp,
270        ];
271        for msg_type in all_types {
272            let env = VShardEnvelope::new(msg_type, 1, 2, 0, vec![]);
273            let _ = dispatch_by_type(&env); // Must not panic.
274        }
275    }
276}