Skip to main content

nodedb_cluster/
wire.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Transport-agnostic vShard envelope.
4//!
5//! "The on-wire format for vShard migration (segment files, WAL tail,
6//! routing metadata) MUST be identical regardless of whether the transport
7//! is RDMA or QUIC/TCP. The transport layer is a dumb pipe; serialization
8//! logic MUST NOT branch on transport type."
9//!
10//! This module defines the canonical wire format for all vShard-related
11//! messages. Both RDMA and QUIC paths serialize/deserialize using this
12//! format — no transport-specific branches.
13
14/// Envelope wrapping all vShard wire messages.
15#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
16pub struct VShardEnvelope {
17    /// Protocol version for forward compatibility.
18    pub version: u16,
19    /// Message type discriminant.
20    pub msg_type: VShardMessageType,
21    /// Source node ID.
22    pub source_node: u64,
23    /// Target node ID.
24    pub target_node: u64,
25    /// vShard being referenced.
26    pub vshard_id: u32,
27    /// Opaque payload (type-dependent).
28    pub payload: Vec<u8>,
29}
30
31/// Message types for vShard wire protocol.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
33#[repr(u16)]
34pub enum VShardMessageType {
35    /// Phase 1: Segment file chunk during base copy.
36    SegmentChunk = 1,
37    /// Phase 1: Segment transfer complete marker.
38    SegmentComplete = 2,
39    /// Phase 2: WAL tail entries for catch-up.
40    WalTail = 3,
41    /// Phase 3: Routing table update (atomic cut-over).
42    RoutingUpdate = 4,
43    /// Routing table acknowledgement.
44    RoutingAck = 5,
45    /// Ghost stub creation notification.
46    GhostCreate = 10,
47    /// Ghost stub deletion notification.
48    GhostDelete = 11,
49    /// Anti-entropy sweep query.
50    GhostVerifyRequest = 12,
51    /// Anti-entropy sweep response.
52    GhostVerifyResponse = 13,
53    /// Migration base-copy segment data.
54    MigrationBaseCopy = 20,
55    /// GSI forward entry.
56    GsiForward = 22,
57    /// Edge validation request.
58    EdgeValidation = 23,
59
60    // ── Graph Algorithm BSP (Bulk Synchronous Parallel) ──
61    /// Superstep barrier: coordinator tells all shards to begin iteration N.
62    GraphAlgoSuperstep = 30,
63    /// Boundary vertex contributions: shard sends rank contributions for
64    /// vertices owned by the target shard (scatter phase).
65    GraphAlgoContributions = 31,
66    /// Superstep complete: shard reports local convergence delta and
67    /// vertex count to coordinator (gather phase).
68    GraphAlgoSuperstepAck = 32,
69    /// Algorithm complete: coordinator broadcasts final signal with
70    /// global convergence status.
71    GraphAlgoComplete = 33,
72
73    // ── Timeseries Distributed Aggregation ──
74    /// Scatter: coordinator sends aggregation query to a shard.
75    TsScatterRequest = 40,
76    /// Gather: shard responds with partial aggregates.
77    TsScatterResponse = 41,
78    /// Coordinator broadcasts retention command to all shards.
79    TsRetentionCommand = 42,
80    /// Shard acknowledges retention execution.
81    TsRetentionAck = 43,
82    /// S3 archival command: coordinator tells shard to archive partitions.
83    TsArchiveCommand = 44,
84    /// S3 archival acknowledgement.
85    TsArchiveAck = 45,
86
87    // ── Distributed Vector Search ──
88    /// Scatter: coordinator sends k-NN query to a shard.
89    VectorScatterRequest = 50,
90    /// Gather: shard responds with local top-K hits.
91    VectorScatterResponse = 51,
92
93    // ── Compass: coarse-code routing ──
94    /// Phase 1 request: coordinator asks a shard for its coarse routing
95    /// descriptor (learned coarse codes, centroid summary, or equivalent).
96    /// The shard responds with `VectorCoarseRouteResponse` before the
97    /// coordinator selects the shard subset for the fine search phase.
98    VectorCoarseRouteRequest = 52,
99    /// Phase 1 response: shard returns its coarse routing descriptor so
100    /// the coordinator can decide whether to include it in phase 2.
101    VectorCoarseRouteResponse = 53,
102
103    // ── SPIRE: build-time centroid exchange ──
104    /// Build-time request: a shard sends its IVF centroid table to a peer
105    /// so the peer can build cross-shard centroid knowledge.
106    /// Sent shard-to-shard without coordinator involvement.
107    VectorBuildExchangeRequest = 54,
108    /// Build-time response: receiving shard acknowledges and optionally
109    /// echoes its own centroid summary back to the sender.
110    VectorBuildExchangeResponse = 55,
111
112    // ── CoTra-RDMA: one-sided read support ──
113    /// Registration request: a shard asks a peer to expose a named memory
114    /// region (e.g. a pinned HNSW graph segment) for one-sided reads.
115    /// The peer responds with `VectorMemRegionInfo` containing the address
116    /// and rkey, or indicates the region is unavailable.
117    VectorMemRegionRequest = 56,
118    /// Registration response: peer returns address/rkey for the requested
119    /// memory region, or `available = false` when not supported.
120    VectorMemRegionResponse = 57,
121
122    // ── Distributed Spatial Queries ──
123    /// Scatter: coordinator sends spatial predicate query to a shard.
124    SpatialScatterRequest = 60,
125    /// Gather: shard responds with matching document IDs.
126    SpatialScatterResponse = 61,
127
128    // ── Event Plane Cross-Shard Delivery ──
129    /// Cross-shard event write request (trigger DML, CDC, etc.).
130    CrossShardEvent = 70,
131    /// Acknowledgement for a cross-shard event write.
132    CrossShardEventAck = 71,
133    /// Broadcast NOTIFY message to all peers (LISTEN/NOTIFY cluster-wide).
134    NotifyBroadcast = 72,
135    /// Acknowledgement for a NOTIFY broadcast.
136    NotifyBroadcastAck = 73,
137
138    // ── Distributed Array (Hilbert-sharded sparse arrays) ──
139    /// Scatter: coordinator sends a coord-range slice query to a shard.
140    ArrayShardSliceReq = 80,
141    /// Gather: shard responds with matching row bytes.
142    ArrayShardSliceResp = 81,
143    /// Scatter: coordinator sends an aggregate query to a shard.
144    ArrayShardAggReq = 82,
145    /// Gather: shard responds with partial aggregate(s).
146    ArrayShardAggResp = 83,
147    /// Coordinator forwards a cell write batch to the owning shard.
148    ArrayShardPutReq = 84,
149    /// Shard acknowledges a cell write batch.
150    ArrayShardPutResp = 85,
151    /// Coordinator forwards a coord-based delete to the owning shard.
152    ArrayShardDeleteReq = 86,
153    /// Shard acknowledges a coord-based delete.
154    ArrayShardDeleteResp = 87,
155    /// Scatter: coordinator requests a surrogate bitmap scan from a shard.
156    ArrayShardSurrogateBitmapReq = 88,
157    /// Gather: shard returns the surrogate bitmap for matching cells.
158    ArrayShardSurrogateBitmapResp = 89,
159}
160
161/// Current wire protocol version.
162///
163/// v2 widens `vshard_id` u16→u32 in the binary frame, increasing min header
164/// size from 26 to 28 bytes.
165pub const WIRE_VERSION: u16 = 2;
166
167impl VShardEnvelope {
168    pub fn new(
169        msg_type: VShardMessageType,
170        source_node: u64,
171        target_node: u64,
172        vshard_id: u32,
173        payload: Vec<u8>,
174    ) -> Self {
175        Self {
176            version: WIRE_VERSION,
177            msg_type,
178            source_node,
179            target_node,
180            vshard_id,
181            payload,
182        }
183    }
184
185    /// Serialize to bytes (transport-agnostic).
186    ///
187    /// Binary format (v2, all little-endian):
188    ///   version(2) + msg_type(2) + source(8) + target(8) + vshard(4) + payload_len(4) + payload
189    pub fn to_bytes(&self) -> Vec<u8> {
190        let mut buf = Vec::with_capacity(28 + self.payload.len());
191        buf.extend_from_slice(&self.version.to_le_bytes());
192        buf.extend_from_slice(&(self.msg_type as u16).to_le_bytes());
193        buf.extend_from_slice(&self.source_node.to_le_bytes());
194        buf.extend_from_slice(&self.target_node.to_le_bytes());
195        buf.extend_from_slice(&self.vshard_id.to_le_bytes());
196        buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
197        buf.extend_from_slice(&self.payload);
198        buf
199    }
200
201    /// Deserialize from bytes.
202    pub fn from_bytes(buf: &[u8]) -> Option<Self> {
203        if buf.len() < 28 {
204            return None;
205        }
206        let version = u16::from_le_bytes(buf[0..2].try_into().ok()?);
207        let msg_type_raw = u16::from_le_bytes(buf[2..4].try_into().ok()?);
208        let source_node = u64::from_le_bytes(buf[4..12].try_into().ok()?);
209        let target_node = u64::from_le_bytes(buf[12..20].try_into().ok()?);
210        let vshard_id = u32::from_le_bytes(buf[20..24].try_into().ok()?);
211        let payload_len = u32::from_le_bytes(buf[24..28].try_into().ok()?) as usize;
212
213        if buf.len() < 28 + payload_len {
214            return None;
215        }
216        let payload = buf[28..28 + payload_len].to_vec();
217
218        let msg_type = match msg_type_raw {
219            1 => VShardMessageType::SegmentChunk,
220            2 => VShardMessageType::SegmentComplete,
221            3 => VShardMessageType::WalTail,
222            4 => VShardMessageType::RoutingUpdate,
223            5 => VShardMessageType::RoutingAck,
224            10 => VShardMessageType::GhostCreate,
225            11 => VShardMessageType::GhostDelete,
226            12 => VShardMessageType::GhostVerifyRequest,
227            13 => VShardMessageType::GhostVerifyResponse,
228            20 => VShardMessageType::MigrationBaseCopy,
229            22 => VShardMessageType::GsiForward,
230            23 => VShardMessageType::EdgeValidation,
231            30 => VShardMessageType::GraphAlgoSuperstep,
232            31 => VShardMessageType::GraphAlgoContributions,
233            32 => VShardMessageType::GraphAlgoSuperstepAck,
234            33 => VShardMessageType::GraphAlgoComplete,
235            40 => VShardMessageType::TsScatterRequest,
236            41 => VShardMessageType::TsScatterResponse,
237            42 => VShardMessageType::TsRetentionCommand,
238            43 => VShardMessageType::TsRetentionAck,
239            44 => VShardMessageType::TsArchiveCommand,
240            45 => VShardMessageType::TsArchiveAck,
241            50 => VShardMessageType::VectorScatterRequest,
242            51 => VShardMessageType::VectorScatterResponse,
243            52 => VShardMessageType::VectorCoarseRouteRequest,
244            53 => VShardMessageType::VectorCoarseRouteResponse,
245            54 => VShardMessageType::VectorBuildExchangeRequest,
246            55 => VShardMessageType::VectorBuildExchangeResponse,
247            56 => VShardMessageType::VectorMemRegionRequest,
248            57 => VShardMessageType::VectorMemRegionResponse,
249            60 => VShardMessageType::SpatialScatterRequest,
250            61 => VShardMessageType::SpatialScatterResponse,
251            70 => VShardMessageType::CrossShardEvent,
252            71 => VShardMessageType::CrossShardEventAck,
253            72 => VShardMessageType::NotifyBroadcast,
254            73 => VShardMessageType::NotifyBroadcastAck,
255            80 => VShardMessageType::ArrayShardSliceReq,
256            81 => VShardMessageType::ArrayShardSliceResp,
257            82 => VShardMessageType::ArrayShardAggReq,
258            83 => VShardMessageType::ArrayShardAggResp,
259            84 => VShardMessageType::ArrayShardPutReq,
260            85 => VShardMessageType::ArrayShardPutResp,
261            86 => VShardMessageType::ArrayShardDeleteReq,
262            87 => VShardMessageType::ArrayShardDeleteResp,
263            88 => VShardMessageType::ArrayShardSurrogateBitmapReq,
264            89 => VShardMessageType::ArrayShardSurrogateBitmapResp,
265            _ => return None,
266        };
267
268        Some(Self {
269            version,
270            msg_type,
271            source_node,
272            target_node,
273            vshard_id,
274            payload,
275        })
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn wire_version_is_2() {
285        assert_eq!(WIRE_VERSION, 2, "v2 widened vshard_id to u32");
286    }
287
288    #[test]
289    fn envelope_roundtrip() {
290        let env = VShardEnvelope::new(
291            VShardMessageType::SegmentChunk,
292            1,
293            2,
294            42,
295            b"segment data".to_vec(),
296        );
297        let bytes = env.to_bytes();
298        let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
299        assert_eq!(env, decoded);
300    }
301
302    #[test]
303    fn all_message_types_roundtrip() {
304        let types = [
305            VShardMessageType::SegmentChunk,
306            VShardMessageType::SegmentComplete,
307            VShardMessageType::WalTail,
308            VShardMessageType::RoutingUpdate,
309            VShardMessageType::RoutingAck,
310            VShardMessageType::GhostCreate,
311            VShardMessageType::GhostDelete,
312            VShardMessageType::GhostVerifyRequest,
313            VShardMessageType::GhostVerifyResponse,
314            VShardMessageType::GraphAlgoSuperstep,
315            VShardMessageType::GraphAlgoContributions,
316            VShardMessageType::GraphAlgoSuperstepAck,
317            VShardMessageType::GraphAlgoComplete,
318            VShardMessageType::TsScatterRequest,
319            VShardMessageType::TsScatterResponse,
320            VShardMessageType::TsRetentionCommand,
321            VShardMessageType::TsRetentionAck,
322            VShardMessageType::TsArchiveCommand,
323            VShardMessageType::TsArchiveAck,
324            VShardMessageType::VectorScatterRequest,
325            VShardMessageType::VectorScatterResponse,
326            VShardMessageType::VectorCoarseRouteRequest,
327            VShardMessageType::VectorCoarseRouteResponse,
328            VShardMessageType::VectorBuildExchangeRequest,
329            VShardMessageType::VectorBuildExchangeResponse,
330            VShardMessageType::VectorMemRegionRequest,
331            VShardMessageType::VectorMemRegionResponse,
332            VShardMessageType::SpatialScatterRequest,
333            VShardMessageType::SpatialScatterResponse,
334            VShardMessageType::CrossShardEvent,
335            VShardMessageType::CrossShardEventAck,
336            VShardMessageType::NotifyBroadcast,
337            VShardMessageType::NotifyBroadcastAck,
338            VShardMessageType::ArrayShardSliceReq,
339            VShardMessageType::ArrayShardSliceResp,
340            VShardMessageType::ArrayShardAggReq,
341            VShardMessageType::ArrayShardAggResp,
342            VShardMessageType::ArrayShardPutReq,
343            VShardMessageType::ArrayShardPutResp,
344            VShardMessageType::ArrayShardDeleteReq,
345            VShardMessageType::ArrayShardDeleteResp,
346            VShardMessageType::ArrayShardSurrogateBitmapReq,
347            VShardMessageType::ArrayShardSurrogateBitmapResp,
348        ];
349
350        for msg_type in types {
351            let env = VShardEnvelope::new(msg_type, 10, 20, 100, vec![1, 2, 3]);
352            let bytes = env.to_bytes();
353            let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
354            assert_eq!(decoded.msg_type, msg_type);
355        }
356    }
357
358    #[test]
359    fn truncated_buffer_returns_none() {
360        let env = VShardEnvelope::new(VShardMessageType::WalTail, 1, 2, 0, vec![0; 100]);
361        let bytes = env.to_bytes();
362        // Truncate payload.
363        assert!(VShardEnvelope::from_bytes(&bytes[..50]).is_none());
364        // Truncate header.
365        assert!(VShardEnvelope::from_bytes(&bytes[..10]).is_none());
366    }
367
368    #[test]
369    fn empty_payload() {
370        let env = VShardEnvelope::new(VShardMessageType::RoutingAck, 5, 6, 999, vec![]);
371        let bytes = env.to_bytes();
372        assert_eq!(bytes.len(), 28); // header only (v2: vshard_id widened u16→u32)
373        let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
374        assert!(decoded.payload.is_empty());
375    }
376
377    #[test]
378    fn unknown_message_type_returns_none() {
379        let mut env =
380            VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 0, vec![]).to_bytes();
381        // Corrupt msg_type to unknown value.
382        env[2] = 0xFF;
383        env[3] = 0xFF;
384        assert!(VShardEnvelope::from_bytes(&env).is_none());
385    }
386
387    #[test]
388    fn wire_format_is_transport_agnostic() {
389        // The same bytes work whether sent over RDMA or QUIC.
390        // This test documents the invariant: no transport-specific branching.
391        let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, b"data".to_vec());
392
393        let rdma_bytes = env.to_bytes();
394        let quic_bytes = env.to_bytes();
395        assert_eq!(
396            rdma_bytes, quic_bytes,
397            "wire format must be transport-agnostic"
398        );
399    }
400
401    #[test]
402    fn large_vshard_id_roundtrip() {
403        // vshard_id is now u32; ensure values above old u16::MAX round-trip
404        // without truncation.
405        let env = VShardEnvelope::new(
406            VShardMessageType::WalTail,
407            10,
408            20,
409            0x0001_FFFF,
410            b"payload".to_vec(),
411        );
412        let bytes = env.to_bytes();
413        let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
414        assert_eq!(decoded.vshard_id, 0x0001_FFFFu32);
415    }
416
417    #[test]
418    fn truncated_below_28_returns_none() {
419        // With v2 header = 28 bytes, a 27-byte buffer must be rejected.
420        let env = VShardEnvelope::new(VShardMessageType::RoutingAck, 1, 2, 0, vec![]);
421        let bytes = env.to_bytes();
422        assert_eq!(bytes.len(), 28);
423        assert!(VShardEnvelope::from_bytes(&bytes[..27]).is_none());
424    }
425}