1use crate::wire::{VShardEnvelope, VShardMessageType};
16
17pub enum HandleResult {
19 Response(VShardEnvelope),
21 NoResponse,
23 Error(String),
25}
26
27pub trait VShardHandler: Send + Sync + 'static {
32 fn handle_vshard_envelope(
34 &self,
35 envelope: VShardEnvelope,
36 ) -> impl std::future::Future<Output = HandleResult> + Send;
37}
38
39pub fn dispatch_by_type(envelope: &VShardEnvelope) -> DispatchTarget {
44 match envelope.msg_type {
45 VShardMessageType::GraphAlgoSuperstep => DispatchTarget::GraphBsp,
47 VShardMessageType::GraphAlgoContributions => DispatchTarget::GraphBsp,
48 VShardMessageType::GraphAlgoSuperstepAck => DispatchTarget::GraphBsp,
49 VShardMessageType::GraphAlgoComplete => DispatchTarget::GraphBsp,
50
51 VShardMessageType::TsScatterRequest => DispatchTarget::TimeseriesScan,
53 VShardMessageType::TsScatterResponse => DispatchTarget::TimeseriesCoordinator,
54 VShardMessageType::TsRetentionCommand => DispatchTarget::TimeseriesRetention,
55 VShardMessageType::TsRetentionAck => DispatchTarget::TimeseriesCoordinator,
56 VShardMessageType::TsArchiveCommand => DispatchTarget::TimeseriesArchive,
57 VShardMessageType::TsArchiveAck => DispatchTarget::TimeseriesCoordinator,
58
59 VShardMessageType::SegmentChunk
61 | VShardMessageType::SegmentComplete
62 | VShardMessageType::WalTail
63 | VShardMessageType::RoutingUpdate
64 | VShardMessageType::RoutingAck => DispatchTarget::Migration,
65
66 VShardMessageType::GhostCreate
67 | VShardMessageType::GhostDelete
68 | VShardMessageType::GhostVerifyRequest
69 | VShardMessageType::GhostVerifyResponse => DispatchTarget::Ghost,
70
71 VShardMessageType::MigrationBaseCopy => DispatchTarget::Migration,
72 VShardMessageType::GsiForward => DispatchTarget::Forward,
73 VShardMessageType::EdgeValidation => DispatchTarget::GraphValidation,
74
75 VShardMessageType::VectorScatterRequest => DispatchTarget::VectorSearch,
77 VShardMessageType::VectorScatterResponse => DispatchTarget::VectorCoordinator,
78
79 VShardMessageType::VectorCoarseRouteRequest => DispatchTarget::VectorCoarseRoute,
81 VShardMessageType::VectorCoarseRouteResponse => DispatchTarget::VectorCoordinator,
82
83 VShardMessageType::VectorBuildExchangeRequest => DispatchTarget::VectorBuildExchange,
85 VShardMessageType::VectorBuildExchangeResponse => DispatchTarget::VectorBuildExchange,
86
87 VShardMessageType::VectorMemRegionRequest => DispatchTarget::VectorMemRegion,
89 VShardMessageType::VectorMemRegionResponse => DispatchTarget::VectorMemRegion,
90
91 VShardMessageType::SpatialScatterRequest => DispatchTarget::SpatialSearch,
93 VShardMessageType::SpatialScatterResponse => DispatchTarget::SpatialCoordinator,
94
95 VShardMessageType::CrossShardEvent => DispatchTarget::EventPlane,
97 VShardMessageType::CrossShardEventAck => DispatchTarget::EventPlane,
98 VShardMessageType::NotifyBroadcast => DispatchTarget::EventPlane,
99 VShardMessageType::NotifyBroadcastAck => DispatchTarget::EventPlane,
100
101 VShardMessageType::ArrayShardSliceReq => DispatchTarget::ArrayShard,
103 VShardMessageType::ArrayShardSliceResp => DispatchTarget::ArrayCoordinator,
104 VShardMessageType::ArrayShardAggReq => DispatchTarget::ArrayShard,
105 VShardMessageType::ArrayShardAggResp => DispatchTarget::ArrayCoordinator,
106 VShardMessageType::ArrayShardPutReq => DispatchTarget::ArrayShard,
107 VShardMessageType::ArrayShardPutResp => DispatchTarget::ArrayCoordinator,
108 VShardMessageType::ArrayShardDeleteReq => DispatchTarget::ArrayShard,
109 VShardMessageType::ArrayShardDeleteResp => DispatchTarget::ArrayCoordinator,
110 VShardMessageType::ArrayShardSurrogateBitmapReq => DispatchTarget::ArrayShard,
111 VShardMessageType::ArrayShardSurrogateBitmapResp => DispatchTarget::ArrayCoordinator,
112 }
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum DispatchTarget {
118 GraphBsp,
120 GraphValidation,
122 TimeseriesScan,
124 TimeseriesCoordinator,
126 TimeseriesRetention,
128 TimeseriesArchive,
130 VectorSearch,
132 VectorCoordinator,
134 VectorCoarseRoute,
136 VectorBuildExchange,
138 VectorMemRegion,
141 Migration,
143 Ghost,
145 Forward,
147 SpatialSearch,
149 SpatialCoordinator,
151 EventPlane,
153 ArrayShard,
155 ArrayCoordinator,
157}
158
159pub fn build_ts_scatter_response(
161 source_node: u64,
162 target_node: u64,
163 vshard_id: u32,
164 partials_json: &[u8],
165) -> VShardEnvelope {
166 VShardEnvelope::new(
167 VShardMessageType::TsScatterResponse,
168 source_node,
169 target_node,
170 vshard_id,
171 partials_json.to_vec(),
172 )
173}
174
175pub fn build_ts_retention_ack(
177 source_node: u64,
178 target_node: u64,
179 vshard_id: u32,
180 result_json: &[u8],
181) -> VShardEnvelope {
182 VShardEnvelope::new(
183 VShardMessageType::TsRetentionAck,
184 source_node,
185 target_node,
186 vshard_id,
187 result_json.to_vec(),
188 )
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn dispatch_graph_bsp() {
197 let env = VShardEnvelope::new(VShardMessageType::GraphAlgoSuperstep, 1, 2, 42, vec![]);
198 assert_eq!(dispatch_by_type(&env), DispatchTarget::GraphBsp);
199 }
200
201 #[test]
202 fn dispatch_ts_scatter() {
203 let env = VShardEnvelope::new(VShardMessageType::TsScatterRequest, 1, 2, 42, vec![]);
204 assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesScan);
205 }
206
207 #[test]
208 fn dispatch_ts_retention() {
209 let env = VShardEnvelope::new(VShardMessageType::TsRetentionCommand, 1, 2, 42, vec![]);
210 assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesRetention);
211 }
212
213 #[test]
214 fn dispatch_migration() {
215 let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, vec![]);
216 assert_eq!(dispatch_by_type(&env), DispatchTarget::Migration);
217 }
218
219 #[test]
220 fn all_message_types_dispatched() {
221 let all_types = [
224 VShardMessageType::SegmentChunk,
225 VShardMessageType::SegmentComplete,
226 VShardMessageType::WalTail,
227 VShardMessageType::RoutingUpdate,
228 VShardMessageType::RoutingAck,
229 VShardMessageType::GhostCreate,
230 VShardMessageType::GhostDelete,
231 VShardMessageType::GhostVerifyRequest,
232 VShardMessageType::GhostVerifyResponse,
233 VShardMessageType::MigrationBaseCopy,
234 VShardMessageType::GsiForward,
235 VShardMessageType::EdgeValidation,
236 VShardMessageType::GraphAlgoSuperstep,
237 VShardMessageType::GraphAlgoContributions,
238 VShardMessageType::GraphAlgoSuperstepAck,
239 VShardMessageType::GraphAlgoComplete,
240 VShardMessageType::TsScatterRequest,
241 VShardMessageType::TsScatterResponse,
242 VShardMessageType::TsRetentionCommand,
243 VShardMessageType::TsRetentionAck,
244 VShardMessageType::TsArchiveCommand,
245 VShardMessageType::TsArchiveAck,
246 VShardMessageType::VectorScatterRequest,
247 VShardMessageType::VectorScatterResponse,
248 VShardMessageType::VectorCoarseRouteRequest,
249 VShardMessageType::VectorCoarseRouteResponse,
250 VShardMessageType::VectorBuildExchangeRequest,
251 VShardMessageType::VectorBuildExchangeResponse,
252 VShardMessageType::VectorMemRegionRequest,
253 VShardMessageType::VectorMemRegionResponse,
254 VShardMessageType::SpatialScatterRequest,
255 VShardMessageType::SpatialScatterResponse,
256 VShardMessageType::CrossShardEvent,
257 VShardMessageType::CrossShardEventAck,
258 VShardMessageType::NotifyBroadcast,
259 VShardMessageType::NotifyBroadcastAck,
260 VShardMessageType::ArrayShardSliceReq,
261 VShardMessageType::ArrayShardSliceResp,
262 VShardMessageType::ArrayShardAggReq,
263 VShardMessageType::ArrayShardAggResp,
264 VShardMessageType::ArrayShardPutReq,
265 VShardMessageType::ArrayShardPutResp,
266 VShardMessageType::ArrayShardDeleteReq,
267 VShardMessageType::ArrayShardDeleteResp,
268 VShardMessageType::ArrayShardSurrogateBitmapReq,
269 VShardMessageType::ArrayShardSurrogateBitmapResp,
270 ];
271 for msg_type in all_types {
272 let env = VShardEnvelope::new(msg_type, 1, 2, 0, vec![]);
273 let _ = dispatch_by_type(&env); }
275 }
276}