1#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
16pub struct VShardEnvelope {
17 pub version: u16,
19 pub msg_type: VShardMessageType,
21 pub source_node: u64,
23 pub target_node: u64,
25 pub vshard_id: u32,
27 pub payload: Vec<u8>,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
33#[repr(u16)]
34pub enum VShardMessageType {
35 SegmentChunk = 1,
37 SegmentComplete = 2,
39 WalTail = 3,
41 RoutingUpdate = 4,
43 RoutingAck = 5,
45 GhostCreate = 10,
47 GhostDelete = 11,
49 GhostVerifyRequest = 12,
51 GhostVerifyResponse = 13,
53 MigrationBaseCopy = 20,
55 GsiForward = 22,
57 EdgeValidation = 23,
59
60 GraphAlgoSuperstep = 30,
63 GraphAlgoContributions = 31,
66 GraphAlgoSuperstepAck = 32,
69 GraphAlgoComplete = 33,
72
73 TsScatterRequest = 40,
76 TsScatterResponse = 41,
78 TsRetentionCommand = 42,
80 TsRetentionAck = 43,
82 TsArchiveCommand = 44,
84 TsArchiveAck = 45,
86
87 VectorScatterRequest = 50,
90 VectorScatterResponse = 51,
92
93 VectorCoarseRouteRequest = 52,
99 VectorCoarseRouteResponse = 53,
102
103 VectorBuildExchangeRequest = 54,
108 VectorBuildExchangeResponse = 55,
111
112 VectorMemRegionRequest = 56,
118 VectorMemRegionResponse = 57,
121
122 SpatialScatterRequest = 60,
125 SpatialScatterResponse = 61,
127
128 CrossShardEvent = 70,
131 CrossShardEventAck = 71,
133 NotifyBroadcast = 72,
135 NotifyBroadcastAck = 73,
137
138 ArrayShardSliceReq = 80,
141 ArrayShardSliceResp = 81,
143 ArrayShardAggReq = 82,
145 ArrayShardAggResp = 83,
147 ArrayShardPutReq = 84,
149 ArrayShardPutResp = 85,
151 ArrayShardDeleteReq = 86,
153 ArrayShardDeleteResp = 87,
155 ArrayShardSurrogateBitmapReq = 88,
157 ArrayShardSurrogateBitmapResp = 89,
159}
160
161pub const WIRE_VERSION: u16 = 2;
166
167impl VShardEnvelope {
168 pub fn new(
169 msg_type: VShardMessageType,
170 source_node: u64,
171 target_node: u64,
172 vshard_id: u32,
173 payload: Vec<u8>,
174 ) -> Self {
175 Self {
176 version: WIRE_VERSION,
177 msg_type,
178 source_node,
179 target_node,
180 vshard_id,
181 payload,
182 }
183 }
184
185 pub fn to_bytes(&self) -> Vec<u8> {
190 let mut buf = Vec::with_capacity(28 + self.payload.len());
191 buf.extend_from_slice(&self.version.to_le_bytes());
192 buf.extend_from_slice(&(self.msg_type as u16).to_le_bytes());
193 buf.extend_from_slice(&self.source_node.to_le_bytes());
194 buf.extend_from_slice(&self.target_node.to_le_bytes());
195 buf.extend_from_slice(&self.vshard_id.to_le_bytes());
196 buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
197 buf.extend_from_slice(&self.payload);
198 buf
199 }
200
201 pub fn from_bytes(buf: &[u8]) -> Option<Self> {
203 if buf.len() < 28 {
204 return None;
205 }
206 let version = u16::from_le_bytes(buf[0..2].try_into().ok()?);
207 let msg_type_raw = u16::from_le_bytes(buf[2..4].try_into().ok()?);
208 let source_node = u64::from_le_bytes(buf[4..12].try_into().ok()?);
209 let target_node = u64::from_le_bytes(buf[12..20].try_into().ok()?);
210 let vshard_id = u32::from_le_bytes(buf[20..24].try_into().ok()?);
211 let payload_len = u32::from_le_bytes(buf[24..28].try_into().ok()?) as usize;
212
213 if buf.len() < 28 + payload_len {
214 return None;
215 }
216 let payload = buf[28..28 + payload_len].to_vec();
217
218 let msg_type = match msg_type_raw {
219 1 => VShardMessageType::SegmentChunk,
220 2 => VShardMessageType::SegmentComplete,
221 3 => VShardMessageType::WalTail,
222 4 => VShardMessageType::RoutingUpdate,
223 5 => VShardMessageType::RoutingAck,
224 10 => VShardMessageType::GhostCreate,
225 11 => VShardMessageType::GhostDelete,
226 12 => VShardMessageType::GhostVerifyRequest,
227 13 => VShardMessageType::GhostVerifyResponse,
228 20 => VShardMessageType::MigrationBaseCopy,
229 22 => VShardMessageType::GsiForward,
230 23 => VShardMessageType::EdgeValidation,
231 30 => VShardMessageType::GraphAlgoSuperstep,
232 31 => VShardMessageType::GraphAlgoContributions,
233 32 => VShardMessageType::GraphAlgoSuperstepAck,
234 33 => VShardMessageType::GraphAlgoComplete,
235 40 => VShardMessageType::TsScatterRequest,
236 41 => VShardMessageType::TsScatterResponse,
237 42 => VShardMessageType::TsRetentionCommand,
238 43 => VShardMessageType::TsRetentionAck,
239 44 => VShardMessageType::TsArchiveCommand,
240 45 => VShardMessageType::TsArchiveAck,
241 50 => VShardMessageType::VectorScatterRequest,
242 51 => VShardMessageType::VectorScatterResponse,
243 52 => VShardMessageType::VectorCoarseRouteRequest,
244 53 => VShardMessageType::VectorCoarseRouteResponse,
245 54 => VShardMessageType::VectorBuildExchangeRequest,
246 55 => VShardMessageType::VectorBuildExchangeResponse,
247 56 => VShardMessageType::VectorMemRegionRequest,
248 57 => VShardMessageType::VectorMemRegionResponse,
249 60 => VShardMessageType::SpatialScatterRequest,
250 61 => VShardMessageType::SpatialScatterResponse,
251 70 => VShardMessageType::CrossShardEvent,
252 71 => VShardMessageType::CrossShardEventAck,
253 72 => VShardMessageType::NotifyBroadcast,
254 73 => VShardMessageType::NotifyBroadcastAck,
255 80 => VShardMessageType::ArrayShardSliceReq,
256 81 => VShardMessageType::ArrayShardSliceResp,
257 82 => VShardMessageType::ArrayShardAggReq,
258 83 => VShardMessageType::ArrayShardAggResp,
259 84 => VShardMessageType::ArrayShardPutReq,
260 85 => VShardMessageType::ArrayShardPutResp,
261 86 => VShardMessageType::ArrayShardDeleteReq,
262 87 => VShardMessageType::ArrayShardDeleteResp,
263 88 => VShardMessageType::ArrayShardSurrogateBitmapReq,
264 89 => VShardMessageType::ArrayShardSurrogateBitmapResp,
265 _ => return None,
266 };
267
268 Some(Self {
269 version,
270 msg_type,
271 source_node,
272 target_node,
273 vshard_id,
274 payload,
275 })
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn wire_version_is_2() {
285 assert_eq!(WIRE_VERSION, 2, "v2 widened vshard_id to u32");
286 }
287
288 #[test]
289 fn envelope_roundtrip() {
290 let env = VShardEnvelope::new(
291 VShardMessageType::SegmentChunk,
292 1,
293 2,
294 42,
295 b"segment data".to_vec(),
296 );
297 let bytes = env.to_bytes();
298 let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
299 assert_eq!(env, decoded);
300 }
301
302 #[test]
303 fn all_message_types_roundtrip() {
304 let types = [
305 VShardMessageType::SegmentChunk,
306 VShardMessageType::SegmentComplete,
307 VShardMessageType::WalTail,
308 VShardMessageType::RoutingUpdate,
309 VShardMessageType::RoutingAck,
310 VShardMessageType::GhostCreate,
311 VShardMessageType::GhostDelete,
312 VShardMessageType::GhostVerifyRequest,
313 VShardMessageType::GhostVerifyResponse,
314 VShardMessageType::GraphAlgoSuperstep,
315 VShardMessageType::GraphAlgoContributions,
316 VShardMessageType::GraphAlgoSuperstepAck,
317 VShardMessageType::GraphAlgoComplete,
318 VShardMessageType::TsScatterRequest,
319 VShardMessageType::TsScatterResponse,
320 VShardMessageType::TsRetentionCommand,
321 VShardMessageType::TsRetentionAck,
322 VShardMessageType::TsArchiveCommand,
323 VShardMessageType::TsArchiveAck,
324 VShardMessageType::VectorScatterRequest,
325 VShardMessageType::VectorScatterResponse,
326 VShardMessageType::VectorCoarseRouteRequest,
327 VShardMessageType::VectorCoarseRouteResponse,
328 VShardMessageType::VectorBuildExchangeRequest,
329 VShardMessageType::VectorBuildExchangeResponse,
330 VShardMessageType::VectorMemRegionRequest,
331 VShardMessageType::VectorMemRegionResponse,
332 VShardMessageType::SpatialScatterRequest,
333 VShardMessageType::SpatialScatterResponse,
334 VShardMessageType::CrossShardEvent,
335 VShardMessageType::CrossShardEventAck,
336 VShardMessageType::NotifyBroadcast,
337 VShardMessageType::NotifyBroadcastAck,
338 VShardMessageType::ArrayShardSliceReq,
339 VShardMessageType::ArrayShardSliceResp,
340 VShardMessageType::ArrayShardAggReq,
341 VShardMessageType::ArrayShardAggResp,
342 VShardMessageType::ArrayShardPutReq,
343 VShardMessageType::ArrayShardPutResp,
344 VShardMessageType::ArrayShardDeleteReq,
345 VShardMessageType::ArrayShardDeleteResp,
346 VShardMessageType::ArrayShardSurrogateBitmapReq,
347 VShardMessageType::ArrayShardSurrogateBitmapResp,
348 ];
349
350 for msg_type in types {
351 let env = VShardEnvelope::new(msg_type, 10, 20, 100, vec![1, 2, 3]);
352 let bytes = env.to_bytes();
353 let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
354 assert_eq!(decoded.msg_type, msg_type);
355 }
356 }
357
358 #[test]
359 fn truncated_buffer_returns_none() {
360 let env = VShardEnvelope::new(VShardMessageType::WalTail, 1, 2, 0, vec![0; 100]);
361 let bytes = env.to_bytes();
362 assert!(VShardEnvelope::from_bytes(&bytes[..50]).is_none());
364 assert!(VShardEnvelope::from_bytes(&bytes[..10]).is_none());
366 }
367
368 #[test]
369 fn empty_payload() {
370 let env = VShardEnvelope::new(VShardMessageType::RoutingAck, 5, 6, 999, vec![]);
371 let bytes = env.to_bytes();
372 assert_eq!(bytes.len(), 28); let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
374 assert!(decoded.payload.is_empty());
375 }
376
377 #[test]
378 fn unknown_message_type_returns_none() {
379 let mut env =
380 VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 0, vec![]).to_bytes();
381 env[2] = 0xFF;
383 env[3] = 0xFF;
384 assert!(VShardEnvelope::from_bytes(&env).is_none());
385 }
386
387 #[test]
388 fn wire_format_is_transport_agnostic() {
389 let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, b"data".to_vec());
392
393 let rdma_bytes = env.to_bytes();
394 let quic_bytes = env.to_bytes();
395 assert_eq!(
396 rdma_bytes, quic_bytes,
397 "wire format must be transport-agnostic"
398 );
399 }
400
401 #[test]
402 fn large_vshard_id_roundtrip() {
403 let env = VShardEnvelope::new(
406 VShardMessageType::WalTail,
407 10,
408 20,
409 0x0001_FFFF,
410 b"payload".to_vec(),
411 );
412 let bytes = env.to_bytes();
413 let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
414 assert_eq!(decoded.vshard_id, 0x0001_FFFFu32);
415 }
416
417 #[test]
418 fn truncated_below_28_returns_none() {
419 let env = VShardEnvelope::new(VShardMessageType::RoutingAck, 1, 2, 0, vec![]);
421 let bytes = env.to_bytes();
422 assert_eq!(bytes.len(), 28);
423 assert!(VShardEnvelope::from_bytes(&bytes[..27]).is_none());
424 }
425}