nodedb_cluster/rpc_codec/
cluster_mgmt.rs1use super::discriminants::*;
4use super::header::write_frame;
5use super::raft_rpc::RaftRpc;
6use crate::error::{ClusterError, Result};
7
8pub const LEADER_REDIRECT_PREFIX: &str = "not leader; retry at ";
11
12#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
14pub struct JoinRequest {
15 pub node_id: u64,
16 pub listen_addr: String,
17 pub wire_version: u16,
18}
19
20#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
22pub struct JoinResponse {
23 pub success: bool,
24 pub error: String,
25 pub cluster_id: u64,
26 pub nodes: Vec<JoinNodeInfo>,
27 pub vshard_to_group: Vec<u64>,
28 pub groups: Vec<JoinGroupInfo>,
29}
30
31#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
33pub struct JoinNodeInfo {
34 pub node_id: u64,
35 pub addr: String,
36 pub state: u8,
37 pub raft_groups: Vec<u64>,
38 pub wire_version: u16,
39}
40
41#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
43pub struct JoinGroupInfo {
44 pub group_id: u64,
45 pub leader: u64,
46 pub members: Vec<u64>,
47 pub learners: Vec<u64>,
48}
49
50#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
52pub struct PingRequest {
53 pub sender_id: u64,
54 pub topology_version: u64,
55}
56
57#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
59pub struct PongResponse {
60 pub responder_id: u64,
61 pub topology_version: u64,
62}
63
64#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
66pub struct TopologyUpdate {
67 pub version: u64,
68 pub nodes: Vec<JoinNodeInfo>,
69}
70
71#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
73pub struct TopologyAck {
74 pub responder_id: u64,
75 pub accepted_version: u64,
76}
77
78macro_rules! to_bytes {
79 ($msg:expr) => {
80 rkyv::to_bytes::<rkyv::rancor::Error>($msg)
81 .map(|b| b.to_vec())
82 .map_err(|e| ClusterError::Codec {
83 detail: format!("rkyv serialize: {e}"),
84 })
85 };
86}
87
88macro_rules! from_bytes {
89 ($payload:expr, $T:ty, $name:expr) => {{
90 let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
91 aligned.extend_from_slice($payload);
92 rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
93 detail: format!("rkyv deserialize {}: {e}", $name),
94 })
95 }};
96}
97
98pub(super) fn encode_join_req(msg: &JoinRequest, out: &mut Vec<u8>) -> Result<()> {
99 write_frame(RPC_JOIN_REQ, &to_bytes!(msg)?, out)
100}
101pub(super) fn encode_join_resp(msg: &JoinResponse, out: &mut Vec<u8>) -> Result<()> {
102 write_frame(RPC_JOIN_RESP, &to_bytes!(msg)?, out)
103}
104pub(super) fn encode_ping(msg: &PingRequest, out: &mut Vec<u8>) -> Result<()> {
105 write_frame(RPC_PING, &to_bytes!(msg)?, out)
106}
107pub(super) fn encode_pong(msg: &PongResponse, out: &mut Vec<u8>) -> Result<()> {
108 write_frame(RPC_PONG, &to_bytes!(msg)?, out)
109}
110pub(super) fn encode_topology_update(msg: &TopologyUpdate, out: &mut Vec<u8>) -> Result<()> {
111 write_frame(RPC_TOPOLOGY_UPDATE, &to_bytes!(msg)?, out)
112}
113pub(super) fn encode_topology_ack(msg: &TopologyAck, out: &mut Vec<u8>) -> Result<()> {
114 write_frame(RPC_TOPOLOGY_ACK, &to_bytes!(msg)?, out)
115}
116
117pub(super) fn decode_join_req(payload: &[u8]) -> Result<RaftRpc> {
118 Ok(RaftRpc::JoinRequest(from_bytes!(
119 payload,
120 JoinRequest,
121 "JoinRequest"
122 )?))
123}
124pub(super) fn decode_join_resp(payload: &[u8]) -> Result<RaftRpc> {
125 Ok(RaftRpc::JoinResponse(from_bytes!(
126 payload,
127 JoinResponse,
128 "JoinResponse"
129 )?))
130}
131pub(super) fn decode_ping(payload: &[u8]) -> Result<RaftRpc> {
132 Ok(RaftRpc::Ping(from_bytes!(
133 payload,
134 PingRequest,
135 "PingRequest"
136 )?))
137}
138pub(super) fn decode_pong(payload: &[u8]) -> Result<RaftRpc> {
139 Ok(RaftRpc::Pong(from_bytes!(
140 payload,
141 PongResponse,
142 "PongResponse"
143 )?))
144}
145pub(super) fn decode_topology_update(payload: &[u8]) -> Result<RaftRpc> {
146 Ok(RaftRpc::TopologyUpdate(from_bytes!(
147 payload,
148 TopologyUpdate,
149 "TopologyUpdate"
150 )?))
151}
152pub(super) fn decode_topology_ack(payload: &[u8]) -> Result<RaftRpc> {
153 Ok(RaftRpc::TopologyAck(from_bytes!(
154 payload,
155 TopologyAck,
156 "TopologyAck"
157 )?))
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 fn roundtrip(rpc: RaftRpc) -> RaftRpc {
165 let encoded = super::super::encode(&rpc).unwrap();
166 super::super::decode(&encoded).unwrap()
167 }
168
169 #[test]
170 fn roundtrip_join_request() {
171 let req = JoinRequest {
172 node_id: 42,
173 listen_addr: "10.0.0.5:9400".into(),
174 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
175 };
176 match roundtrip(RaftRpc::JoinRequest(req)) {
177 RaftRpc::JoinRequest(d) => {
178 assert_eq!(d.node_id, 42);
179 assert_eq!(d.listen_addr, "10.0.0.5:9400");
180 }
181 other => panic!("expected JoinRequest, got {other:?}"),
182 }
183 }
184
185 #[test]
186 fn roundtrip_join_response() {
187 let resp = JoinResponse {
188 success: true,
189 error: String::new(),
190 cluster_id: 12345,
191 nodes: vec![JoinNodeInfo {
192 node_id: 1,
193 addr: "10.0.0.1:9400".into(),
194 state: 1,
195 raft_groups: vec![0, 1],
196 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
197 }],
198 vshard_to_group: (0..1024u64).map(|i| i % 4).collect(),
199 groups: vec![JoinGroupInfo {
200 group_id: 0,
201 leader: 1,
202 members: vec![1],
203 learners: vec![],
204 }],
205 };
206 match roundtrip(RaftRpc::JoinResponse(resp)) {
207 RaftRpc::JoinResponse(d) => {
208 assert!(d.success);
209 assert_eq!(d.nodes.len(), 1);
210 assert_eq!(d.vshard_to_group.len(), 1024);
211 }
212 other => panic!("expected JoinResponse, got {other:?}"),
213 }
214 }
215}