1#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
14pub struct VShardEnvelope {
15 pub version: u16,
17 pub msg_type: VShardMessageType,
19 pub source_node: u64,
21 pub target_node: u64,
23 pub vshard_id: u16,
25 pub payload: Vec<u8>,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
31#[repr(u16)]
32pub enum VShardMessageType {
33 SegmentChunk = 1,
35 SegmentComplete = 2,
37 WalTail = 3,
39 RoutingUpdate = 4,
41 RoutingAck = 5,
43 GhostCreate = 10,
45 GhostDelete = 11,
47 GhostVerifyRequest = 12,
49 GhostVerifyResponse = 13,
51 MigrationBaseCopy = 20,
53 CrossShardForward = 21,
55 GsiForward = 22,
57 EdgeValidation = 23,
59
60 GraphAlgoSuperstep = 30,
63 GraphAlgoContributions = 31,
66 GraphAlgoSuperstepAck = 32,
69 GraphAlgoComplete = 33,
72
73 TsScatterRequest = 40,
76 TsScatterResponse = 41,
78 TsRetentionCommand = 42,
80 TsRetentionAck = 43,
82 TsArchiveCommand = 44,
84 TsArchiveAck = 45,
86
87 VectorScatterRequest = 50,
90 VectorScatterResponse = 51,
92
93 SpatialScatterRequest = 60,
96 SpatialScatterResponse = 61,
98
99 CrossShardEvent = 70,
102 CrossShardEventAck = 71,
104 NotifyBroadcast = 72,
106 NotifyBroadcastAck = 73,
108}
109
110pub const WIRE_VERSION: u16 = 1;
112
113impl VShardEnvelope {
114 pub fn new(
115 msg_type: VShardMessageType,
116 source_node: u64,
117 target_node: u64,
118 vshard_id: u16,
119 payload: Vec<u8>,
120 ) -> Self {
121 Self {
122 version: WIRE_VERSION,
123 msg_type,
124 source_node,
125 target_node,
126 vshard_id,
127 payload,
128 }
129 }
130
131 pub fn to_bytes(&self) -> Vec<u8> {
133 let mut buf = Vec::with_capacity(26 + self.payload.len());
136 buf.extend_from_slice(&self.version.to_le_bytes());
137 buf.extend_from_slice(&(self.msg_type as u16).to_le_bytes());
138 buf.extend_from_slice(&self.source_node.to_le_bytes());
139 buf.extend_from_slice(&self.target_node.to_le_bytes());
140 buf.extend_from_slice(&self.vshard_id.to_le_bytes());
141 buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
142 buf.extend_from_slice(&self.payload);
143 buf
144 }
145
146 pub fn from_bytes(buf: &[u8]) -> Option<Self> {
148 if buf.len() < 26 {
149 return None;
150 }
151 let version = u16::from_le_bytes(buf[0..2].try_into().ok()?);
152 let msg_type_raw = u16::from_le_bytes(buf[2..4].try_into().ok()?);
153 let source_node = u64::from_le_bytes(buf[4..12].try_into().ok()?);
154 let target_node = u64::from_le_bytes(buf[12..20].try_into().ok()?);
155 let vshard_id = u16::from_le_bytes(buf[20..22].try_into().ok()?);
156 let payload_len = u32::from_le_bytes(buf[22..26].try_into().ok()?) as usize;
157
158 if buf.len() < 26 + payload_len {
159 return None;
160 }
161 let payload = buf[26..26 + payload_len].to_vec();
162
163 let msg_type = match msg_type_raw {
164 1 => VShardMessageType::SegmentChunk,
165 2 => VShardMessageType::SegmentComplete,
166 3 => VShardMessageType::WalTail,
167 4 => VShardMessageType::RoutingUpdate,
168 5 => VShardMessageType::RoutingAck,
169 10 => VShardMessageType::GhostCreate,
170 11 => VShardMessageType::GhostDelete,
171 12 => VShardMessageType::GhostVerifyRequest,
172 13 => VShardMessageType::GhostVerifyResponse,
173 20 => VShardMessageType::MigrationBaseCopy,
174 21 => VShardMessageType::CrossShardForward,
175 22 => VShardMessageType::GsiForward,
176 23 => VShardMessageType::EdgeValidation,
177 30 => VShardMessageType::GraphAlgoSuperstep,
178 31 => VShardMessageType::GraphAlgoContributions,
179 32 => VShardMessageType::GraphAlgoSuperstepAck,
180 33 => VShardMessageType::GraphAlgoComplete,
181 40 => VShardMessageType::TsScatterRequest,
182 41 => VShardMessageType::TsScatterResponse,
183 42 => VShardMessageType::TsRetentionCommand,
184 43 => VShardMessageType::TsRetentionAck,
185 44 => VShardMessageType::TsArchiveCommand,
186 45 => VShardMessageType::TsArchiveAck,
187 50 => VShardMessageType::VectorScatterRequest,
188 51 => VShardMessageType::VectorScatterResponse,
189 60 => VShardMessageType::SpatialScatterRequest,
190 61 => VShardMessageType::SpatialScatterResponse,
191 70 => VShardMessageType::CrossShardEvent,
192 71 => VShardMessageType::CrossShardEventAck,
193 72 => VShardMessageType::NotifyBroadcast,
194 73 => VShardMessageType::NotifyBroadcastAck,
195 _ => return None,
196 };
197
198 Some(Self {
199 version,
200 msg_type,
201 source_node,
202 target_node,
203 vshard_id,
204 payload,
205 })
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn envelope_roundtrip() {
215 let env = VShardEnvelope::new(
216 VShardMessageType::SegmentChunk,
217 1,
218 2,
219 42,
220 b"segment data".to_vec(),
221 );
222 let bytes = env.to_bytes();
223 let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
224 assert_eq!(env, decoded);
225 }
226
227 #[test]
228 fn all_message_types_roundtrip() {
229 let types = [
230 VShardMessageType::SegmentChunk,
231 VShardMessageType::SegmentComplete,
232 VShardMessageType::WalTail,
233 VShardMessageType::RoutingUpdate,
234 VShardMessageType::RoutingAck,
235 VShardMessageType::GhostCreate,
236 VShardMessageType::GhostDelete,
237 VShardMessageType::GhostVerifyRequest,
238 VShardMessageType::GhostVerifyResponse,
239 VShardMessageType::GraphAlgoSuperstep,
240 VShardMessageType::GraphAlgoContributions,
241 VShardMessageType::GraphAlgoSuperstepAck,
242 VShardMessageType::GraphAlgoComplete,
243 VShardMessageType::TsScatterRequest,
244 VShardMessageType::TsScatterResponse,
245 VShardMessageType::TsRetentionCommand,
246 VShardMessageType::TsRetentionAck,
247 VShardMessageType::TsArchiveCommand,
248 VShardMessageType::TsArchiveAck,
249 VShardMessageType::VectorScatterRequest,
250 VShardMessageType::VectorScatterResponse,
251 VShardMessageType::SpatialScatterRequest,
252 VShardMessageType::SpatialScatterResponse,
253 VShardMessageType::CrossShardEvent,
254 VShardMessageType::CrossShardEventAck,
255 VShardMessageType::NotifyBroadcast,
256 VShardMessageType::NotifyBroadcastAck,
257 ];
258
259 for msg_type in types {
260 let env = VShardEnvelope::new(msg_type, 10, 20, 100, vec![1, 2, 3]);
261 let bytes = env.to_bytes();
262 let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
263 assert_eq!(decoded.msg_type, msg_type);
264 }
265 }
266
267 #[test]
268 fn truncated_buffer_returns_none() {
269 let env = VShardEnvelope::new(VShardMessageType::WalTail, 1, 2, 0, vec![0; 100]);
270 let bytes = env.to_bytes();
271 assert!(VShardEnvelope::from_bytes(&bytes[..50]).is_none());
273 assert!(VShardEnvelope::from_bytes(&bytes[..10]).is_none());
275 }
276
277 #[test]
278 fn empty_payload() {
279 let env = VShardEnvelope::new(VShardMessageType::RoutingAck, 5, 6, 999, vec![]);
280 let bytes = env.to_bytes();
281 assert_eq!(bytes.len(), 26); let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
283 assert!(decoded.payload.is_empty());
284 }
285
286 #[test]
287 fn unknown_message_type_returns_none() {
288 let mut env =
289 VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 0, vec![]).to_bytes();
290 env[2] = 0xFF;
292 env[3] = 0xFF;
293 assert!(VShardEnvelope::from_bytes(&env).is_none());
294 }
295
296 #[test]
297 fn wire_format_is_transport_agnostic() {
298 let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, b"data".to_vec());
301
302 let rdma_bytes = env.to_bytes();
303 let quic_bytes = env.to_bytes();
304 assert_eq!(
305 rdma_bytes, quic_bytes,
306 "wire format must be transport-agnostic"
307 );
308 }
309}