Skip to main content

nodedb_cluster/rpc_codec/
cluster_mgmt.rs

1//! Cluster management wire types and codecs.
2
3use super::discriminants::*;
4use super::header::write_frame;
5use super::raft_rpc::RaftRpc;
6use crate::error::{ClusterError, Result};
7
8/// Wire-level redirect contract between the join-flow producer
9/// and the client-side parser.
10pub const LEADER_REDIRECT_PREFIX: &str = "not leader; retry at ";
11
12/// Request to join an existing cluster.
13#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
14pub struct JoinRequest {
15    pub node_id: u64,
16    pub listen_addr: String,
17    pub wire_version: u16,
18}
19
20/// Response to a join request — carries full cluster state.
21#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
22pub struct JoinResponse {
23    pub success: bool,
24    pub error: String,
25    pub cluster_id: u64,
26    pub nodes: Vec<JoinNodeInfo>,
27    pub vshard_to_group: Vec<u64>,
28    pub groups: Vec<JoinGroupInfo>,
29}
30
31/// Node info in the join response wire format.
32#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
33pub struct JoinNodeInfo {
34    pub node_id: u64,
35    pub addr: String,
36    pub state: u8,
37    pub raft_groups: Vec<u64>,
38    pub wire_version: u16,
39}
40
41/// Raft group membership in the join response wire format.
42#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
43pub struct JoinGroupInfo {
44    pub group_id: u64,
45    pub leader: u64,
46    pub members: Vec<u64>,
47    pub learners: Vec<u64>,
48}
49
50/// Health check ping.
51#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
52pub struct PingRequest {
53    pub sender_id: u64,
54    pub topology_version: u64,
55}
56
57/// Health check pong.
58#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
59pub struct PongResponse {
60    pub responder_id: u64,
61    pub topology_version: u64,
62}
63
64/// Push topology update to a peer.
65#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
66pub struct TopologyUpdate {
67    pub version: u64,
68    pub nodes: Vec<JoinNodeInfo>,
69}
70
71/// Acknowledgement of a topology update.
72#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
73pub struct TopologyAck {
74    pub responder_id: u64,
75    pub accepted_version: u64,
76}
77
78macro_rules! to_bytes {
79    ($msg:expr) => {
80        rkyv::to_bytes::<rkyv::rancor::Error>($msg)
81            .map(|b| b.to_vec())
82            .map_err(|e| ClusterError::Codec {
83                detail: format!("rkyv serialize: {e}"),
84            })
85    };
86}
87
88macro_rules! from_bytes {
89    ($payload:expr, $T:ty, $name:expr) => {{
90        let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
91        aligned.extend_from_slice($payload);
92        rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
93            detail: format!("rkyv deserialize {}: {e}", $name),
94        })
95    }};
96}
97
98pub(super) fn encode_join_req(msg: &JoinRequest, out: &mut Vec<u8>) -> Result<()> {
99    write_frame(RPC_JOIN_REQ, &to_bytes!(msg)?, out)
100}
101pub(super) fn encode_join_resp(msg: &JoinResponse, out: &mut Vec<u8>) -> Result<()> {
102    write_frame(RPC_JOIN_RESP, &to_bytes!(msg)?, out)
103}
104pub(super) fn encode_ping(msg: &PingRequest, out: &mut Vec<u8>) -> Result<()> {
105    write_frame(RPC_PING, &to_bytes!(msg)?, out)
106}
107pub(super) fn encode_pong(msg: &PongResponse, out: &mut Vec<u8>) -> Result<()> {
108    write_frame(RPC_PONG, &to_bytes!(msg)?, out)
109}
110pub(super) fn encode_topology_update(msg: &TopologyUpdate, out: &mut Vec<u8>) -> Result<()> {
111    write_frame(RPC_TOPOLOGY_UPDATE, &to_bytes!(msg)?, out)
112}
113pub(super) fn encode_topology_ack(msg: &TopologyAck, out: &mut Vec<u8>) -> Result<()> {
114    write_frame(RPC_TOPOLOGY_ACK, &to_bytes!(msg)?, out)
115}
116
117pub(super) fn decode_join_req(payload: &[u8]) -> Result<RaftRpc> {
118    Ok(RaftRpc::JoinRequest(from_bytes!(
119        payload,
120        JoinRequest,
121        "JoinRequest"
122    )?))
123}
124pub(super) fn decode_join_resp(payload: &[u8]) -> Result<RaftRpc> {
125    Ok(RaftRpc::JoinResponse(from_bytes!(
126        payload,
127        JoinResponse,
128        "JoinResponse"
129    )?))
130}
131pub(super) fn decode_ping(payload: &[u8]) -> Result<RaftRpc> {
132    Ok(RaftRpc::Ping(from_bytes!(
133        payload,
134        PingRequest,
135        "PingRequest"
136    )?))
137}
138pub(super) fn decode_pong(payload: &[u8]) -> Result<RaftRpc> {
139    Ok(RaftRpc::Pong(from_bytes!(
140        payload,
141        PongResponse,
142        "PongResponse"
143    )?))
144}
145pub(super) fn decode_topology_update(payload: &[u8]) -> Result<RaftRpc> {
146    Ok(RaftRpc::TopologyUpdate(from_bytes!(
147        payload,
148        TopologyUpdate,
149        "TopologyUpdate"
150    )?))
151}
152pub(super) fn decode_topology_ack(payload: &[u8]) -> Result<RaftRpc> {
153    Ok(RaftRpc::TopologyAck(from_bytes!(
154        payload,
155        TopologyAck,
156        "TopologyAck"
157    )?))
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    fn roundtrip(rpc: RaftRpc) -> RaftRpc {
165        let encoded = super::super::encode(&rpc).unwrap();
166        super::super::decode(&encoded).unwrap()
167    }
168
169    #[test]
170    fn roundtrip_join_request() {
171        let req = JoinRequest {
172            node_id: 42,
173            listen_addr: "10.0.0.5:9400".into(),
174            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
175        };
176        match roundtrip(RaftRpc::JoinRequest(req)) {
177            RaftRpc::JoinRequest(d) => {
178                assert_eq!(d.node_id, 42);
179                assert_eq!(d.listen_addr, "10.0.0.5:9400");
180            }
181            other => panic!("expected JoinRequest, got {other:?}"),
182        }
183    }
184
185    #[test]
186    fn roundtrip_join_response() {
187        let resp = JoinResponse {
188            success: true,
189            error: String::new(),
190            cluster_id: 12345,
191            nodes: vec![JoinNodeInfo {
192                node_id: 1,
193                addr: "10.0.0.1:9400".into(),
194                state: 1,
195                raft_groups: vec![0, 1],
196                wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
197            }],
198            vshard_to_group: (0..1024u64).map(|i| i % 4).collect(),
199            groups: vec![JoinGroupInfo {
200                group_id: 0,
201                leader: 1,
202                members: vec![1],
203                learners: vec![],
204            }],
205        };
206        match roundtrip(RaftRpc::JoinResponse(resp)) {
207            RaftRpc::JoinResponse(d) => {
208                assert!(d.success);
209                assert_eq!(d.nodes.len(), 1);
210                assert_eq!(d.vshard_to_group.len(), 1024);
211            }
212            other => panic!("expected JoinResponse, got {other:?}"),
213        }
214    }
215}