Skip to main content

nodedb_cluster/
vshard_handler.rs

1//! Shard-side handler for incoming VShardEnvelope messages.
2//!
3//! When a remote node sends a VShardEnvelope via QUIC (wrapped in
4//! `RaftRpc::VShardEnvelope`), the transport server routes it here.
5//! This handler dispatches based on `VShardMessageType` to the
6//! appropriate engine handler:
7//!
8//! - Graph BSP messages → graph algorithm shard handler
9//! - Timeseries scatter → local scan + partial aggregate response
10//! - Retention command → local retention enforcement
11//! - S3 archive command → local archive execution
12
13use crate::wire::{VShardEnvelope, VShardMessageType};
14
15/// Result of handling a VShardEnvelope.
16pub enum HandleResult {
17    /// Send a response envelope back to the coordinator.
18    Response(VShardEnvelope),
19    /// No response needed (fire-and-forget message).
20    NoResponse,
21    /// Handler error.
22    Error(String),
23}
24
25/// Trait for handling incoming VShardEnvelope messages on a shard.
26///
27/// Implemented by the main binary, which has access to the Data Plane
28/// engines (timeseries scan, graph traversal, etc.).
29pub trait VShardHandler: Send + Sync + 'static {
30    /// Handle an incoming VShardEnvelope and optionally produce a response.
31    fn handle_vshard_envelope(
32        &self,
33        envelope: VShardEnvelope,
34    ) -> impl std::future::Future<Output = HandleResult> + Send;
35}
36
37/// Default handler that dispatches based on message type.
38///
39/// The main binary implements `VShardHandler` and uses this function
40/// as the dispatch core, delegating to engine-specific handlers.
41pub fn dispatch_by_type(envelope: &VShardEnvelope) -> DispatchTarget {
42    match envelope.msg_type {
43        // Graph BSP
44        VShardMessageType::GraphAlgoSuperstep => DispatchTarget::GraphBsp,
45        VShardMessageType::GraphAlgoContributions => DispatchTarget::GraphBsp,
46        VShardMessageType::GraphAlgoSuperstepAck => DispatchTarget::GraphBsp,
47        VShardMessageType::GraphAlgoComplete => DispatchTarget::GraphBsp,
48
49        // Timeseries distributed
50        VShardMessageType::TsScatterRequest => DispatchTarget::TimeseriesScan,
51        VShardMessageType::TsScatterResponse => DispatchTarget::TimeseriesCoordinator,
52        VShardMessageType::TsRetentionCommand => DispatchTarget::TimeseriesRetention,
53        VShardMessageType::TsRetentionAck => DispatchTarget::TimeseriesCoordinator,
54        VShardMessageType::TsArchiveCommand => DispatchTarget::TimeseriesArchive,
55        VShardMessageType::TsArchiveAck => DispatchTarget::TimeseriesCoordinator,
56
57        // Migration / infrastructure
58        VShardMessageType::SegmentChunk
59        | VShardMessageType::SegmentComplete
60        | VShardMessageType::WalTail
61        | VShardMessageType::RoutingUpdate
62        | VShardMessageType::RoutingAck => DispatchTarget::Migration,
63
64        VShardMessageType::GhostCreate
65        | VShardMessageType::GhostDelete
66        | VShardMessageType::GhostVerifyRequest
67        | VShardMessageType::GhostVerifyResponse => DispatchTarget::Ghost,
68
69        VShardMessageType::MigrationBaseCopy => DispatchTarget::Migration,
70        VShardMessageType::CrossShardForward => DispatchTarget::Forward,
71        VShardMessageType::GsiForward => DispatchTarget::Forward,
72        VShardMessageType::EdgeValidation => DispatchTarget::GraphValidation,
73
74        // Vector distributed search
75        VShardMessageType::VectorScatterRequest => DispatchTarget::VectorSearch,
76        VShardMessageType::VectorScatterResponse => DispatchTarget::VectorCoordinator,
77
78        // Spatial distributed queries
79        VShardMessageType::SpatialScatterRequest => DispatchTarget::SpatialSearch,
80        VShardMessageType::SpatialScatterResponse => DispatchTarget::SpatialCoordinator,
81
82        // Event Plane cross-shard delivery
83        VShardMessageType::CrossShardEvent => DispatchTarget::EventPlane,
84        VShardMessageType::CrossShardEventAck => DispatchTarget::EventPlane,
85        VShardMessageType::NotifyBroadcast => DispatchTarget::EventPlane,
86        VShardMessageType::NotifyBroadcastAck => DispatchTarget::EventPlane,
87    }
88}
89
90/// Which engine subsystem should handle this envelope.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum DispatchTarget {
93    /// Graph BSP algorithm handler.
94    GraphBsp,
95    /// Graph edge validation.
96    GraphValidation,
97    /// Timeseries local scan (shard executes scan, returns partials).
98    TimeseriesScan,
99    /// Timeseries coordinator (receives shard responses).
100    TimeseriesCoordinator,
101    /// Timeseries retention enforcement on local shard.
102    TimeseriesRetention,
103    /// Timeseries S3 archive execution on local shard.
104    TimeseriesArchive,
105    /// Vector local k-NN search (shard executes search, returns top-K hits).
106    VectorSearch,
107    /// Vector coordinator (receives shard search responses).
108    VectorCoordinator,
109    /// Migration infrastructure.
110    Migration,
111    /// Ghost stub management.
112    Ghost,
113    /// Query/transaction forwarding.
114    Forward,
115    /// Spatial local R-tree search (shard executes predicate, returns matching docs).
116    SpatialSearch,
117    /// Spatial coordinator (receives shard search responses).
118    SpatialCoordinator,
119    /// Event Plane cross-shard delivery (trigger DML, CDC events).
120    EventPlane,
121}
122
123/// Build a response envelope for a timeseries scatter response.
124pub fn build_ts_scatter_response(
125    source_node: u64,
126    target_node: u64,
127    vshard_id: u16,
128    partials_json: &[u8],
129) -> VShardEnvelope {
130    VShardEnvelope::new(
131        VShardMessageType::TsScatterResponse,
132        source_node,
133        target_node,
134        vshard_id,
135        partials_json.to_vec(),
136    )
137}
138
139/// Build a response envelope for a retention acknowledgement.
140pub fn build_ts_retention_ack(
141    source_node: u64,
142    target_node: u64,
143    vshard_id: u16,
144    result_json: &[u8],
145) -> VShardEnvelope {
146    VShardEnvelope::new(
147        VShardMessageType::TsRetentionAck,
148        source_node,
149        target_node,
150        vshard_id,
151        result_json.to_vec(),
152    )
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn dispatch_graph_bsp() {
161        let env = VShardEnvelope::new(VShardMessageType::GraphAlgoSuperstep, 1, 2, 42, vec![]);
162        assert_eq!(dispatch_by_type(&env), DispatchTarget::GraphBsp);
163    }
164
165    #[test]
166    fn dispatch_ts_scatter() {
167        let env = VShardEnvelope::new(VShardMessageType::TsScatterRequest, 1, 2, 42, vec![]);
168        assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesScan);
169    }
170
171    #[test]
172    fn dispatch_ts_retention() {
173        let env = VShardEnvelope::new(VShardMessageType::TsRetentionCommand, 1, 2, 42, vec![]);
174        assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesRetention);
175    }
176
177    #[test]
178    fn dispatch_migration() {
179        let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, vec![]);
180        assert_eq!(dispatch_by_type(&env), DispatchTarget::Migration);
181    }
182
183    #[test]
184    fn all_message_types_dispatched() {
185        // Every VShardMessageType must map to a DispatchTarget —
186        // this test ensures no new types are added without a handler.
187        let all_types = [
188            VShardMessageType::SegmentChunk,
189            VShardMessageType::SegmentComplete,
190            VShardMessageType::WalTail,
191            VShardMessageType::RoutingUpdate,
192            VShardMessageType::RoutingAck,
193            VShardMessageType::GhostCreate,
194            VShardMessageType::GhostDelete,
195            VShardMessageType::GhostVerifyRequest,
196            VShardMessageType::GhostVerifyResponse,
197            VShardMessageType::MigrationBaseCopy,
198            VShardMessageType::CrossShardForward,
199            VShardMessageType::GsiForward,
200            VShardMessageType::EdgeValidation,
201            VShardMessageType::GraphAlgoSuperstep,
202            VShardMessageType::GraphAlgoContributions,
203            VShardMessageType::GraphAlgoSuperstepAck,
204            VShardMessageType::GraphAlgoComplete,
205            VShardMessageType::TsScatterRequest,
206            VShardMessageType::TsScatterResponse,
207            VShardMessageType::TsRetentionCommand,
208            VShardMessageType::TsRetentionAck,
209            VShardMessageType::TsArchiveCommand,
210            VShardMessageType::TsArchiveAck,
211            VShardMessageType::VectorScatterRequest,
212            VShardMessageType::VectorScatterResponse,
213            VShardMessageType::SpatialScatterRequest,
214            VShardMessageType::SpatialScatterResponse,
215            VShardMessageType::CrossShardEvent,
216            VShardMessageType::CrossShardEventAck,
217            VShardMessageType::NotifyBroadcast,
218            VShardMessageType::NotifyBroadcastAck,
219        ];
220        for msg_type in all_types {
221            let env = VShardEnvelope::new(msg_type, 1, 2, 0, vec![]);
222            let _ = dispatch_by_type(&env); // Must not panic.
223        }
224    }
225}