Skip to main content

nodedb_cluster/rpc_codec/
cluster_mgmt.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cluster management wire types and codecs.
4
5use super::discriminants::*;
6use super::header::write_frame;
7use super::raft_rpc::RaftRpc;
8use crate::error::{ClusterError, Result};
9
10/// Wire-level redirect contract between the join-flow producer
11/// and the client-side parser.
12pub const LEADER_REDIRECT_PREFIX: &str = "not leader; retry at ";
13
14/// Request to join an existing cluster.
15#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
16pub struct JoinRequest {
17    pub node_id: u64,
18    pub listen_addr: String,
19    pub wire_version: u16,
20    /// SPIFFE URI SAN from the joiner's mTLS leaf certificate, if present.
21    pub spiffe_id: Option<String>,
22    /// SHA-256 SPKI fingerprint of the joiner's mTLS leaf certificate.
23    /// Stored as `Vec<u8>` for rkyv compatibility; always 32 bytes when present.
24    pub spki_pin: Option<Vec<u8>>,
25}
26
27/// Response to a join request — carries full cluster state.
28#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
29pub struct JoinResponse {
30    pub success: bool,
31    pub error: String,
32    pub cluster_id: u64,
33    pub nodes: Vec<JoinNodeInfo>,
34    pub vshard_to_group: Vec<u64>,
35    pub groups: Vec<JoinGroupInfo>,
36}
37
38/// Node info in the join response wire format.
39#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
40pub struct JoinNodeInfo {
41    pub node_id: u64,
42    pub addr: String,
43    pub state: u8,
44    pub raft_groups: Vec<u64>,
45    pub wire_version: u16,
46    /// SPIFFE URI SAN for this node, if known.
47    pub spiffe_id: Option<String>,
48    /// SHA-256 SPKI fingerprint for this node.
49    /// Stored as `Vec<u8>` for rkyv compatibility; always 32 bytes when present.
50    pub spki_pin: Option<Vec<u8>>,
51}
52
53/// Raft group membership in the join response wire format.
54#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
55pub struct JoinGroupInfo {
56    pub group_id: u64,
57    pub leader: u64,
58    pub members: Vec<u64>,
59    pub learners: Vec<u64>,
60}
61
62/// Health check ping.
63#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
64pub struct PingRequest {
65    pub sender_id: u64,
66    pub topology_version: u64,
67}
68
69/// Health check pong.
70#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
71pub struct PongResponse {
72    pub responder_id: u64,
73    pub topology_version: u64,
74}
75
76/// Push topology update to a peer.
77#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
78pub struct TopologyUpdate {
79    pub version: u64,
80    pub nodes: Vec<JoinNodeInfo>,
81}
82
83/// Acknowledgement of a topology update.
84#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
85pub struct TopologyAck {
86    pub responder_id: u64,
87    pub accepted_version: u64,
88}
89
90macro_rules! to_bytes {
91    ($msg:expr) => {
92        rkyv::to_bytes::<rkyv::rancor::Error>($msg)
93            .map(|b| b.to_vec())
94            .map_err(|e| ClusterError::Codec {
95                detail: format!("rkyv serialize: {e}"),
96            })
97    };
98}
99
100macro_rules! from_bytes {
101    ($payload:expr, $T:ty, $name:expr) => {{
102        let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity($payload.len());
103        aligned.extend_from_slice($payload);
104        rkyv::from_bytes::<$T, rkyv::rancor::Error>(&aligned).map_err(|e| ClusterError::Codec {
105            detail: format!("rkyv deserialize {}: {e}", $name),
106        })
107    }};
108}
109
110pub(super) fn encode_join_req(msg: &JoinRequest, out: &mut Vec<u8>) -> Result<()> {
111    write_frame(RPC_JOIN_REQ, &to_bytes!(msg)?, out)
112}
113pub(super) fn encode_join_resp(msg: &JoinResponse, out: &mut Vec<u8>) -> Result<()> {
114    write_frame(RPC_JOIN_RESP, &to_bytes!(msg)?, out)
115}
116pub(super) fn encode_ping(msg: &PingRequest, out: &mut Vec<u8>) -> Result<()> {
117    write_frame(RPC_PING, &to_bytes!(msg)?, out)
118}
119pub(super) fn encode_pong(msg: &PongResponse, out: &mut Vec<u8>) -> Result<()> {
120    write_frame(RPC_PONG, &to_bytes!(msg)?, out)
121}
122pub(super) fn encode_topology_update(msg: &TopologyUpdate, out: &mut Vec<u8>) -> Result<()> {
123    write_frame(RPC_TOPOLOGY_UPDATE, &to_bytes!(msg)?, out)
124}
125pub(super) fn encode_topology_ack(msg: &TopologyAck, out: &mut Vec<u8>) -> Result<()> {
126    write_frame(RPC_TOPOLOGY_ACK, &to_bytes!(msg)?, out)
127}
128
129pub(super) fn decode_join_req(payload: &[u8]) -> Result<RaftRpc> {
130    Ok(RaftRpc::JoinRequest(from_bytes!(
131        payload,
132        JoinRequest,
133        "JoinRequest"
134    )?))
135}
136pub(super) fn decode_join_resp(payload: &[u8]) -> Result<RaftRpc> {
137    Ok(RaftRpc::JoinResponse(from_bytes!(
138        payload,
139        JoinResponse,
140        "JoinResponse"
141    )?))
142}
143pub(super) fn decode_ping(payload: &[u8]) -> Result<RaftRpc> {
144    Ok(RaftRpc::Ping(from_bytes!(
145        payload,
146        PingRequest,
147        "PingRequest"
148    )?))
149}
150pub(super) fn decode_pong(payload: &[u8]) -> Result<RaftRpc> {
151    Ok(RaftRpc::Pong(from_bytes!(
152        payload,
153        PongResponse,
154        "PongResponse"
155    )?))
156}
157pub(super) fn decode_topology_update(payload: &[u8]) -> Result<RaftRpc> {
158    Ok(RaftRpc::TopologyUpdate(from_bytes!(
159        payload,
160        TopologyUpdate,
161        "TopologyUpdate"
162    )?))
163}
164pub(super) fn decode_topology_ack(payload: &[u8]) -> Result<RaftRpc> {
165    Ok(RaftRpc::TopologyAck(from_bytes!(
166        payload,
167        TopologyAck,
168        "TopologyAck"
169    )?))
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    fn roundtrip(rpc: RaftRpc) -> RaftRpc {
177        let encoded = super::super::encode(&rpc).unwrap();
178        super::super::decode(&encoded).unwrap()
179    }
180
181    #[test]
182    fn roundtrip_join_request() {
183        let req = JoinRequest {
184            node_id: 42,
185            listen_addr: "10.0.0.5:9400".into(),
186            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
187            spiffe_id: Some("spiffe://cluster.local/node/42".into()),
188            spki_pin: Some(vec![0xabu8; 32]),
189        };
190        match roundtrip(RaftRpc::JoinRequest(req)) {
191            RaftRpc::JoinRequest(d) => {
192                assert_eq!(d.node_id, 42);
193                assert_eq!(d.listen_addr, "10.0.0.5:9400");
194                assert_eq!(
195                    d.spiffe_id.as_deref(),
196                    Some("spiffe://cluster.local/node/42")
197                );
198                assert_eq!(d.spki_pin.as_deref(), Some([0xabu8; 32].as_ref()));
199            }
200            other => panic!("expected JoinRequest, got {other:?}"),
201        }
202    }
203
204    #[test]
205    fn roundtrip_join_response() {
206        let resp = JoinResponse {
207            success: true,
208            error: String::new(),
209            cluster_id: 12345,
210            nodes: vec![JoinNodeInfo {
211                node_id: 1,
212                addr: "10.0.0.1:9400".into(),
213                state: 1,
214                raft_groups: vec![0, 1],
215                wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
216                spiffe_id: None,
217                spki_pin: None,
218            }],
219            vshard_to_group: (0..1024u64).map(|i| i % 4).collect(),
220            groups: vec![JoinGroupInfo {
221                group_id: 0,
222                leader: 1,
223                members: vec![1],
224                learners: vec![],
225            }],
226        };
227        match roundtrip(RaftRpc::JoinResponse(resp)) {
228            RaftRpc::JoinResponse(d) => {
229                assert!(d.success);
230                assert_eq!(d.nodes.len(), 1);
231                assert_eq!(d.vshard_to_group.len(), 1024);
232            }
233            other => panic!("expected JoinResponse, got {other:?}"),
234        }
235    }
236}