Skip to main content

nodedb_cluster/
wire.rs

1//! Transport-agnostic vShard envelope.
2//!
3//! "The on-wire format for vShard migration (segment files, WAL tail,
4//! routing metadata) MUST be identical regardless of whether the transport
5//! is RDMA or QUIC/TCP. The transport layer is a dumb pipe; serialization
6//! logic MUST NOT branch on transport type."
7//!
8//! This module defines the canonical wire format for all vShard-related
9//! messages. Both RDMA and QUIC paths serialize/deserialize using this
10//! format — no transport-specific branches.
11
12/// Envelope wrapping all vShard wire messages.
13#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
14pub struct VShardEnvelope {
15    /// Protocol version for forward compatibility.
16    pub version: u16,
17    /// Message type discriminant.
18    pub msg_type: VShardMessageType,
19    /// Source node ID.
20    pub source_node: u64,
21    /// Target node ID.
22    pub target_node: u64,
23    /// vShard being referenced.
24    pub vshard_id: u16,
25    /// Opaque payload (type-dependent).
26    pub payload: Vec<u8>,
27}
28
29/// Message types for vShard wire protocol.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
31#[repr(u16)]
32pub enum VShardMessageType {
33    /// Phase 1: Segment file chunk during base copy.
34    SegmentChunk = 1,
35    /// Phase 1: Segment transfer complete marker.
36    SegmentComplete = 2,
37    /// Phase 2: WAL tail entries for catch-up.
38    WalTail = 3,
39    /// Phase 3: Routing table update (atomic cut-over).
40    RoutingUpdate = 4,
41    /// Routing table acknowledgement.
42    RoutingAck = 5,
43    /// Ghost stub creation notification.
44    GhostCreate = 10,
45    /// Ghost stub deletion notification.
46    GhostDelete = 11,
47    /// Anti-entropy sweep query.
48    GhostVerifyRequest = 12,
49    /// Anti-entropy sweep response.
50    GhostVerifyResponse = 13,
51    /// Migration base-copy segment data.
52    MigrationBaseCopy = 20,
53    /// Cross-shard transaction forward.
54    CrossShardForward = 21,
55    /// GSI forward entry.
56    GsiForward = 22,
57    /// Edge validation request.
58    EdgeValidation = 23,
59
60    // ── Graph Algorithm BSP (Bulk Synchronous Parallel) ──
61    /// Superstep barrier: coordinator tells all shards to begin iteration N.
62    GraphAlgoSuperstep = 30,
63    /// Boundary vertex contributions: shard sends rank contributions for
64    /// vertices owned by the target shard (scatter phase).
65    GraphAlgoContributions = 31,
66    /// Superstep complete: shard reports local convergence delta and
67    /// vertex count to coordinator (gather phase).
68    GraphAlgoSuperstepAck = 32,
69    /// Algorithm complete: coordinator broadcasts final signal with
70    /// global convergence status.
71    GraphAlgoComplete = 33,
72
73    // ── Timeseries Distributed Aggregation ──
74    /// Scatter: coordinator sends aggregation query to a shard.
75    TsScatterRequest = 40,
76    /// Gather: shard responds with partial aggregates.
77    TsScatterResponse = 41,
78    /// Coordinator broadcasts retention command to all shards.
79    TsRetentionCommand = 42,
80    /// Shard acknowledges retention execution.
81    TsRetentionAck = 43,
82    /// S3 archival command: coordinator tells shard to archive partitions.
83    TsArchiveCommand = 44,
84    /// S3 archival acknowledgement.
85    TsArchiveAck = 45,
86
87    // ── Distributed Vector Search ──
88    /// Scatter: coordinator sends k-NN query to a shard.
89    VectorScatterRequest = 50,
90    /// Gather: shard responds with local top-K hits.
91    VectorScatterResponse = 51,
92
93    // ── Distributed Spatial Queries ──
94    /// Scatter: coordinator sends spatial predicate query to a shard.
95    SpatialScatterRequest = 60,
96    /// Gather: shard responds with matching document IDs.
97    SpatialScatterResponse = 61,
98
99    // ── Event Plane Cross-Shard Delivery ──
100    /// Cross-shard event write request (trigger DML, CDC, etc.).
101    CrossShardEvent = 70,
102    /// Acknowledgement for a cross-shard event write.
103    CrossShardEventAck = 71,
104    /// Broadcast NOTIFY message to all peers (LISTEN/NOTIFY cluster-wide).
105    NotifyBroadcast = 72,
106    /// Acknowledgement for a NOTIFY broadcast.
107    NotifyBroadcastAck = 73,
108}
109
110/// Current wire protocol version.
111pub const WIRE_VERSION: u16 = 1;
112
113impl VShardEnvelope {
114    pub fn new(
115        msg_type: VShardMessageType,
116        source_node: u64,
117        target_node: u64,
118        vshard_id: u16,
119        payload: Vec<u8>,
120    ) -> Self {
121        Self {
122            version: WIRE_VERSION,
123            msg_type,
124            source_node,
125            target_node,
126            vshard_id,
127            payload,
128        }
129    }
130
131    /// Serialize to bytes (transport-agnostic).
132    pub fn to_bytes(&self) -> Vec<u8> {
133        // Simple binary format: version(2) + msg_type(2) + source(8) + target(8)
134        // + vshard(2) + payload_len(4) + payload
135        let mut buf = Vec::with_capacity(26 + self.payload.len());
136        buf.extend_from_slice(&self.version.to_le_bytes());
137        buf.extend_from_slice(&(self.msg_type as u16).to_le_bytes());
138        buf.extend_from_slice(&self.source_node.to_le_bytes());
139        buf.extend_from_slice(&self.target_node.to_le_bytes());
140        buf.extend_from_slice(&self.vshard_id.to_le_bytes());
141        buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
142        buf.extend_from_slice(&self.payload);
143        buf
144    }
145
146    /// Deserialize from bytes.
147    pub fn from_bytes(buf: &[u8]) -> Option<Self> {
148        if buf.len() < 26 {
149            return None;
150        }
151        let version = u16::from_le_bytes(buf[0..2].try_into().ok()?);
152        let msg_type_raw = u16::from_le_bytes(buf[2..4].try_into().ok()?);
153        let source_node = u64::from_le_bytes(buf[4..12].try_into().ok()?);
154        let target_node = u64::from_le_bytes(buf[12..20].try_into().ok()?);
155        let vshard_id = u16::from_le_bytes(buf[20..22].try_into().ok()?);
156        let payload_len = u32::from_le_bytes(buf[22..26].try_into().ok()?) as usize;
157
158        if buf.len() < 26 + payload_len {
159            return None;
160        }
161        let payload = buf[26..26 + payload_len].to_vec();
162
163        let msg_type = match msg_type_raw {
164            1 => VShardMessageType::SegmentChunk,
165            2 => VShardMessageType::SegmentComplete,
166            3 => VShardMessageType::WalTail,
167            4 => VShardMessageType::RoutingUpdate,
168            5 => VShardMessageType::RoutingAck,
169            10 => VShardMessageType::GhostCreate,
170            11 => VShardMessageType::GhostDelete,
171            12 => VShardMessageType::GhostVerifyRequest,
172            13 => VShardMessageType::GhostVerifyResponse,
173            20 => VShardMessageType::MigrationBaseCopy,
174            21 => VShardMessageType::CrossShardForward,
175            22 => VShardMessageType::GsiForward,
176            23 => VShardMessageType::EdgeValidation,
177            30 => VShardMessageType::GraphAlgoSuperstep,
178            31 => VShardMessageType::GraphAlgoContributions,
179            32 => VShardMessageType::GraphAlgoSuperstepAck,
180            33 => VShardMessageType::GraphAlgoComplete,
181            40 => VShardMessageType::TsScatterRequest,
182            41 => VShardMessageType::TsScatterResponse,
183            42 => VShardMessageType::TsRetentionCommand,
184            43 => VShardMessageType::TsRetentionAck,
185            44 => VShardMessageType::TsArchiveCommand,
186            45 => VShardMessageType::TsArchiveAck,
187            50 => VShardMessageType::VectorScatterRequest,
188            51 => VShardMessageType::VectorScatterResponse,
189            60 => VShardMessageType::SpatialScatterRequest,
190            61 => VShardMessageType::SpatialScatterResponse,
191            70 => VShardMessageType::CrossShardEvent,
192            71 => VShardMessageType::CrossShardEventAck,
193            72 => VShardMessageType::NotifyBroadcast,
194            73 => VShardMessageType::NotifyBroadcastAck,
195            _ => return None,
196        };
197
198        Some(Self {
199            version,
200            msg_type,
201            source_node,
202            target_node,
203            vshard_id,
204            payload,
205        })
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn envelope_roundtrip() {
215        let env = VShardEnvelope::new(
216            VShardMessageType::SegmentChunk,
217            1,
218            2,
219            42,
220            b"segment data".to_vec(),
221        );
222        let bytes = env.to_bytes();
223        let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
224        assert_eq!(env, decoded);
225    }
226
227    #[test]
228    fn all_message_types_roundtrip() {
229        let types = [
230            VShardMessageType::SegmentChunk,
231            VShardMessageType::SegmentComplete,
232            VShardMessageType::WalTail,
233            VShardMessageType::RoutingUpdate,
234            VShardMessageType::RoutingAck,
235            VShardMessageType::GhostCreate,
236            VShardMessageType::GhostDelete,
237            VShardMessageType::GhostVerifyRequest,
238            VShardMessageType::GhostVerifyResponse,
239            VShardMessageType::GraphAlgoSuperstep,
240            VShardMessageType::GraphAlgoContributions,
241            VShardMessageType::GraphAlgoSuperstepAck,
242            VShardMessageType::GraphAlgoComplete,
243            VShardMessageType::TsScatterRequest,
244            VShardMessageType::TsScatterResponse,
245            VShardMessageType::TsRetentionCommand,
246            VShardMessageType::TsRetentionAck,
247            VShardMessageType::TsArchiveCommand,
248            VShardMessageType::TsArchiveAck,
249            VShardMessageType::VectorScatterRequest,
250            VShardMessageType::VectorScatterResponse,
251            VShardMessageType::SpatialScatterRequest,
252            VShardMessageType::SpatialScatterResponse,
253            VShardMessageType::CrossShardEvent,
254            VShardMessageType::CrossShardEventAck,
255            VShardMessageType::NotifyBroadcast,
256            VShardMessageType::NotifyBroadcastAck,
257        ];
258
259        for msg_type in types {
260            let env = VShardEnvelope::new(msg_type, 10, 20, 100, vec![1, 2, 3]);
261            let bytes = env.to_bytes();
262            let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
263            assert_eq!(decoded.msg_type, msg_type);
264        }
265    }
266
267    #[test]
268    fn truncated_buffer_returns_none() {
269        let env = VShardEnvelope::new(VShardMessageType::WalTail, 1, 2, 0, vec![0; 100]);
270        let bytes = env.to_bytes();
271        // Truncate payload.
272        assert!(VShardEnvelope::from_bytes(&bytes[..50]).is_none());
273        // Truncate header.
274        assert!(VShardEnvelope::from_bytes(&bytes[..10]).is_none());
275    }
276
277    #[test]
278    fn empty_payload() {
279        let env = VShardEnvelope::new(VShardMessageType::RoutingAck, 5, 6, 999, vec![]);
280        let bytes = env.to_bytes();
281        assert_eq!(bytes.len(), 26); // header only
282        let decoded = VShardEnvelope::from_bytes(&bytes).unwrap();
283        assert!(decoded.payload.is_empty());
284    }
285
286    #[test]
287    fn unknown_message_type_returns_none() {
288        let mut env =
289            VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 0, vec![]).to_bytes();
290        // Corrupt msg_type to unknown value.
291        env[2] = 0xFF;
292        env[3] = 0xFF;
293        assert!(VShardEnvelope::from_bytes(&env).is_none());
294    }
295
296    #[test]
297    fn wire_format_is_transport_agnostic() {
298        // The same bytes work whether sent over RDMA or QUIC.
299        // This test documents the invariant: no transport-specific branching.
300        let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, b"data".to_vec());
301
302        let rdma_bytes = env.to_bytes();
303        let quic_bytes = env.to_bytes();
304        assert_eq!(
305            rdma_bytes, quic_bytes,
306            "wire format must be transport-agnostic"
307        );
308    }
309}