nodedb_cluster/rpc_codec/
cluster_mgmt.rs1use super::discriminants::*;
6use super::header::write_frame;
7use super::raft_rpc::RaftRpc;
8use crate::error::{ClusterError, Result};
9
10pub const LEADER_REDIRECT_PREFIX: &str = "not leader; retry at ";
13
14#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
16pub struct JoinRequest {
17 pub node_id: u64,
18 pub listen_addr: String,
19 pub wire_version: u16,
20 pub spiffe_id: Option<String>,
22 pub spki_pin: Option<Vec<u8>>,
25}
26
27#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
29pub struct JoinResponse {
30 pub success: bool,
31 pub error: String,
32 pub cluster_id: u64,
33 pub nodes: Vec<JoinNodeInfo>,
34 pub vshard_to_group: Vec<u64>,
35 pub groups: Vec<JoinGroupInfo>,
36}
37
38#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
40pub struct JoinNodeInfo {
41 pub node_id: u64,
42 pub addr: String,
43 pub state: u8,
44 pub raft_groups: Vec<u64>,
45 pub wire_version: u16,
46 pub spiffe_id: Option<String>,
48 pub spki_pin: Option<Vec<u8>>,
51}
52
53#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
55pub struct JoinGroupInfo {
56 pub group_id: u64,
57 pub leader: u64,
58 pub members: Vec<u64>,
59 pub learners: Vec<u64>,
60}
61
62#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
64pub struct PingRequest {
65 pub sender_id: u64,
66 pub topology_version: u64,
67}
68
69#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
71pub struct PongResponse {
72 pub responder_id: u64,
73 pub topology_version: u64,
74}
75
76#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
78pub struct TopologyUpdate {
79 pub version: u64,
80 pub nodes: Vec<JoinNodeInfo>,
81}
82
83#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
85pub struct TopologyAck {
86 pub responder_id: u64,
87 pub accepted_version: u64,
88}
89
90macro_rules! to_bytes {
91 ($msg:expr) => {
92 rkyv::to_bytes::<rkyv::rancor::Error>($msg)
93 .map(|b| b.to_vec())
94 .map_err(|e| ClusterError::Codec {
95 detail: format!("rkyv serialize: {e}"),
96 })
97 };
98}
99
100macro_rules! from_bytes {
101 ($payload:expr, $T:ty, $name:expr) => {{
102 let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
103 aligned.extend_from_slice($payload);
104 rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
105 detail: format!("rkyv deserialize {}: {e}", $name),
106 })
107 }};
108}
109
110pub(super) fn encode_join_req(msg: &JoinRequest, out: &mut Vec<u8>) -> Result<()> {
111 write_frame(RPC_JOIN_REQ, &to_bytes!(msg)?, out)
112}
113pub(super) fn encode_join_resp(msg: &JoinResponse, out: &mut Vec<u8>) -> Result<()> {
114 write_frame(RPC_JOIN_RESP, &to_bytes!(msg)?, out)
115}
116pub(super) fn encode_ping(msg: &PingRequest, out: &mut Vec<u8>) -> Result<()> {
117 write_frame(RPC_PING, &to_bytes!(msg)?, out)
118}
119pub(super) fn encode_pong(msg: &PongResponse, out: &mut Vec<u8>) -> Result<()> {
120 write_frame(RPC_PONG, &to_bytes!(msg)?, out)
121}
122pub(super) fn encode_topology_update(msg: &TopologyUpdate, out: &mut Vec<u8>) -> Result<()> {
123 write_frame(RPC_TOPOLOGY_UPDATE, &to_bytes!(msg)?, out)
124}
125pub(super) fn encode_topology_ack(msg: &TopologyAck, out: &mut Vec<u8>) -> Result<()> {
126 write_frame(RPC_TOPOLOGY_ACK, &to_bytes!(msg)?, out)
127}
128
129pub(super) fn decode_join_req(payload: &[u8]) -> Result<RaftRpc> {
130 Ok(RaftRpc::JoinRequest(from_bytes!(
131 payload,
132 JoinRequest,
133 "JoinRequest"
134 )?))
135}
136pub(super) fn decode_join_resp(payload: &[u8]) -> Result<RaftRpc> {
137 Ok(RaftRpc::JoinResponse(from_bytes!(
138 payload,
139 JoinResponse,
140 "JoinResponse"
141 )?))
142}
143pub(super) fn decode_ping(payload: &[u8]) -> Result<RaftRpc> {
144 Ok(RaftRpc::Ping(from_bytes!(
145 payload,
146 PingRequest,
147 "PingRequest"
148 )?))
149}
150pub(super) fn decode_pong(payload: &[u8]) -> Result<RaftRpc> {
151 Ok(RaftRpc::Pong(from_bytes!(
152 payload,
153 PongResponse,
154 "PongResponse"
155 )?))
156}
157pub(super) fn decode_topology_update(payload: &[u8]) -> Result<RaftRpc> {
158 Ok(RaftRpc::TopologyUpdate(from_bytes!(
159 payload,
160 TopologyUpdate,
161 "TopologyUpdate"
162 )?))
163}
164pub(super) fn decode_topology_ack(payload: &[u8]) -> Result<RaftRpc> {
165 Ok(RaftRpc::TopologyAck(from_bytes!(
166 payload,
167 TopologyAck,
168 "TopologyAck"
169 )?))
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 fn roundtrip(rpc: RaftRpc) -> RaftRpc {
177 let encoded = super::super::encode(&rpc).unwrap();
178 super::super::decode(&encoded).unwrap()
179 }
180
181 #[test]
182 fn roundtrip_join_request() {
183 let req = JoinRequest {
184 node_id: 42,
185 listen_addr: "10.0.0.5:9400".into(),
186 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
187 spiffe_id: Some("spiffe://cluster.local/node/42".into()),
188 spki_pin: Some(vec![0xabu8; 32]),
189 };
190 match roundtrip(RaftRpc::JoinRequest(req)) {
191 RaftRpc::JoinRequest(d) => {
192 assert_eq!(d.node_id, 42);
193 assert_eq!(d.listen_addr, "10.0.0.5:9400");
194 assert_eq!(
195 d.spiffe_id.as_deref(),
196 Some("spiffe://cluster.local/node/42")
197 );
198 assert_eq!(d.spki_pin.as_deref(), Some([0xabu8; 32].as_ref()));
199 }
200 other => panic!("expected JoinRequest, got {other:?}"),
201 }
202 }
203
204 #[test]
205 fn roundtrip_join_response() {
206 let resp = JoinResponse {
207 success: true,
208 error: String::new(),
209 cluster_id: 12345,
210 nodes: vec![JoinNodeInfo {
211 node_id: 1,
212 addr: "10.0.0.1:9400".into(),
213 state: 1,
214 raft_groups: vec![0, 1],
215 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
216 spiffe_id: None,
217 spki_pin: None,
218 }],
219 vshard_to_group: (0..1024u64).map(|i| i % 4).collect(),
220 groups: vec![JoinGroupInfo {
221 group_id: 0,
222 leader: 1,
223 members: vec![1],
224 learners: vec![],
225 }],
226 };
227 match roundtrip(RaftRpc::JoinResponse(resp)) {
228 RaftRpc::JoinResponse(d) => {
229 assert!(d.success);
230 assert_eq!(d.nodes.len(), 1);
231 assert_eq!(d.vshard_to_group.len(), 1024);
232 }
233 other => panic!("expected JoinResponse, got {other:?}"),
234 }
235 }
236}