1use crate::wire::{VShardEnvelope, VShardMessageType};
14
15pub enum HandleResult {
17 Response(VShardEnvelope),
19 NoResponse,
21 Error(String),
23}
24
25pub trait VShardHandler: Send + Sync + 'static {
30 fn handle_vshard_envelope(
32 &self,
33 envelope: VShardEnvelope,
34 ) -> impl std::future::Future<Output = HandleResult> + Send;
35}
36
37pub fn dispatch_by_type(envelope: &VShardEnvelope) -> DispatchTarget {
42 match envelope.msg_type {
43 VShardMessageType::GraphAlgoSuperstep => DispatchTarget::GraphBsp,
45 VShardMessageType::GraphAlgoContributions => DispatchTarget::GraphBsp,
46 VShardMessageType::GraphAlgoSuperstepAck => DispatchTarget::GraphBsp,
47 VShardMessageType::GraphAlgoComplete => DispatchTarget::GraphBsp,
48
49 VShardMessageType::TsScatterRequest => DispatchTarget::TimeseriesScan,
51 VShardMessageType::TsScatterResponse => DispatchTarget::TimeseriesCoordinator,
52 VShardMessageType::TsRetentionCommand => DispatchTarget::TimeseriesRetention,
53 VShardMessageType::TsRetentionAck => DispatchTarget::TimeseriesCoordinator,
54 VShardMessageType::TsArchiveCommand => DispatchTarget::TimeseriesArchive,
55 VShardMessageType::TsArchiveAck => DispatchTarget::TimeseriesCoordinator,
56
57 VShardMessageType::SegmentChunk
59 | VShardMessageType::SegmentComplete
60 | VShardMessageType::WalTail
61 | VShardMessageType::RoutingUpdate
62 | VShardMessageType::RoutingAck => DispatchTarget::Migration,
63
64 VShardMessageType::GhostCreate
65 | VShardMessageType::GhostDelete
66 | VShardMessageType::GhostVerifyRequest
67 | VShardMessageType::GhostVerifyResponse => DispatchTarget::Ghost,
68
69 VShardMessageType::MigrationBaseCopy => DispatchTarget::Migration,
70 VShardMessageType::CrossShardForward => DispatchTarget::Forward,
71 VShardMessageType::GsiForward => DispatchTarget::Forward,
72 VShardMessageType::EdgeValidation => DispatchTarget::GraphValidation,
73
74 VShardMessageType::VectorScatterRequest => DispatchTarget::VectorSearch,
76 VShardMessageType::VectorScatterResponse => DispatchTarget::VectorCoordinator,
77
78 VShardMessageType::SpatialScatterRequest => DispatchTarget::SpatialSearch,
80 VShardMessageType::SpatialScatterResponse => DispatchTarget::SpatialCoordinator,
81
82 VShardMessageType::CrossShardEvent => DispatchTarget::EventPlane,
84 VShardMessageType::CrossShardEventAck => DispatchTarget::EventPlane,
85 VShardMessageType::NotifyBroadcast => DispatchTarget::EventPlane,
86 VShardMessageType::NotifyBroadcastAck => DispatchTarget::EventPlane,
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum DispatchTarget {
93 GraphBsp,
95 GraphValidation,
97 TimeseriesScan,
99 TimeseriesCoordinator,
101 TimeseriesRetention,
103 TimeseriesArchive,
105 VectorSearch,
107 VectorCoordinator,
109 Migration,
111 Ghost,
113 Forward,
115 SpatialSearch,
117 SpatialCoordinator,
119 EventPlane,
121}
122
123pub fn build_ts_scatter_response(
125 source_node: u64,
126 target_node: u64,
127 vshard_id: u16,
128 partials_json: &[u8],
129) -> VShardEnvelope {
130 VShardEnvelope::new(
131 VShardMessageType::TsScatterResponse,
132 source_node,
133 target_node,
134 vshard_id,
135 partials_json.to_vec(),
136 )
137}
138
139pub fn build_ts_retention_ack(
141 source_node: u64,
142 target_node: u64,
143 vshard_id: u16,
144 result_json: &[u8],
145) -> VShardEnvelope {
146 VShardEnvelope::new(
147 VShardMessageType::TsRetentionAck,
148 source_node,
149 target_node,
150 vshard_id,
151 result_json.to_vec(),
152 )
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn dispatch_graph_bsp() {
161 let env = VShardEnvelope::new(VShardMessageType::GraphAlgoSuperstep, 1, 2, 42, vec![]);
162 assert_eq!(dispatch_by_type(&env), DispatchTarget::GraphBsp);
163 }
164
165 #[test]
166 fn dispatch_ts_scatter() {
167 let env = VShardEnvelope::new(VShardMessageType::TsScatterRequest, 1, 2, 42, vec![]);
168 assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesScan);
169 }
170
171 #[test]
172 fn dispatch_ts_retention() {
173 let env = VShardEnvelope::new(VShardMessageType::TsRetentionCommand, 1, 2, 42, vec![]);
174 assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesRetention);
175 }
176
177 #[test]
178 fn dispatch_migration() {
179 let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, vec![]);
180 assert_eq!(dispatch_by_type(&env), DispatchTarget::Migration);
181 }
182
183 #[test]
184 fn all_message_types_dispatched() {
185 let all_types = [
188 VShardMessageType::SegmentChunk,
189 VShardMessageType::SegmentComplete,
190 VShardMessageType::WalTail,
191 VShardMessageType::RoutingUpdate,
192 VShardMessageType::RoutingAck,
193 VShardMessageType::GhostCreate,
194 VShardMessageType::GhostDelete,
195 VShardMessageType::GhostVerifyRequest,
196 VShardMessageType::GhostVerifyResponse,
197 VShardMessageType::MigrationBaseCopy,
198 VShardMessageType::CrossShardForward,
199 VShardMessageType::GsiForward,
200 VShardMessageType::EdgeValidation,
201 VShardMessageType::GraphAlgoSuperstep,
202 VShardMessageType::GraphAlgoContributions,
203 VShardMessageType::GraphAlgoSuperstepAck,
204 VShardMessageType::GraphAlgoComplete,
205 VShardMessageType::TsScatterRequest,
206 VShardMessageType::TsScatterResponse,
207 VShardMessageType::TsRetentionCommand,
208 VShardMessageType::TsRetentionAck,
209 VShardMessageType::TsArchiveCommand,
210 VShardMessageType::TsArchiveAck,
211 VShardMessageType::VectorScatterRequest,
212 VShardMessageType::VectorScatterResponse,
213 VShardMessageType::SpatialScatterRequest,
214 VShardMessageType::SpatialScatterResponse,
215 VShardMessageType::CrossShardEvent,
216 VShardMessageType::CrossShardEventAck,
217 VShardMessageType::NotifyBroadcast,
218 VShardMessageType::NotifyBroadcastAck,
219 ];
220 for msg_type in all_types {
221 let env = VShardEnvelope::new(msg_type, 1, 2, 0, vec![]);
222 let _ = dispatch_by_type(&env); }
224 }
225}