nodedb_cluster/raft_loop/proposals.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Raft proposal API — local and leader-forwarded proposals for both
4//! the metadata group (group 0) and data groups.
5
6use crate::conf_change::ConfChange;
7use crate::error::Result;
8
9use super::loop_core::{CommitApplier, RaftLoop};
10use crate::forward::PlanExecutor;
11
12impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
13 /// Propose a command to the Raft group owning the given vShard.
14 ///
15 /// Returns `(group_id, log_index)` on success.
16 pub fn propose(&self, vshard_id: u32, data: Vec<u8>) -> Result<(u64, u64)> {
17 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
18 mr.propose(vshard_id, data)
19 }
20
21 /// Propose a command directly to the metadata Raft group (group 0).
22 ///
23 /// Used by the host crate's metadata proposer and by integration
24 /// tests that exercise the replicated-catalog path without a
25 /// pgwire client. Fails with `ClusterError::GroupNotFound` if
26 /// group 0 does not exist on this node, and with
27 /// `ClusterError::Raft(NotLeader)` if this node is not the
28 /// current leader of group 0.
29 pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
30 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
31 mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
32 }
33
34 /// Propose to the metadata Raft group, transparently forwarding
35 /// to the current leader if this node is not it.
36 ///
37 /// Tries a local propose first. On
38 /// `ClusterError::Raft(NotLeader { leader_hint })`, looks up the
39 /// hinted leader's address in cluster topology and sends a
40 /// [`crate::rpc_codec::MetadataProposeRequest`] over QUIC. The
41 /// receiving leader applies the proposal locally and returns
42 /// the log index.
43 ///
44 /// On `NotLeader { leader_hint: None }` (election in progress,
45 /// no observed leader yet) the call returns the original
46 /// `NotLeader` error so the caller can decide whether to retry.
47 /// We deliberately do not implement a wait-and-retry loop here
48 /// because the caller (the host-side proposer) may have a
49 /// shorter deadline than any reasonable retry budget.
50 ///
51 /// The leader-side path through this function is identical to
52 /// the bare `propose_to_metadata_group` — the only extra cost is
53 /// an `is_leader_locally` check before the local propose.
54 pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
55 // First, try a local propose.
56 match self.propose_to_metadata_group(data.clone()) {
57 Ok(idx) => Ok(idx),
58 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
59 leader_hint,
60 })) => {
61 let Some(leader_id) = leader_hint else {
62 return Err(crate::error::ClusterError::Raft(
63 nodedb_raft::RaftError::NotLeader { leader_hint: None },
64 ));
65 };
66 if leader_id == self.node_id {
67 // Should not happen — local propose said we
68 // weren't leader but the hint points at us. Fall
69 // through to the original error so the caller
70 // sees the contradiction.
71 return Err(crate::error::ClusterError::Raft(
72 nodedb_raft::RaftError::NotLeader {
73 leader_hint: Some(leader_id),
74 },
75 ));
76 }
77 // Otherwise forward to the hinted leader.
78 self.forward_metadata_propose(leader_id, data).await
79 }
80 Err(other) => Err(other),
81 }
82 }
83
84 /// Send a `MetadataProposeRequest` to `leader_id`. Looks up the
85 /// leader's listen address via the local topology snapshot and
86 /// dispatches through the existing peer transport.
87 async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
88 // Resolve and register the leader's address with the
89 // transport so `send_rpc` has a destination. Topology is
90 // updated by the membership / health subsystem; if the
91 // leader isn't in our local topology yet we fail loudly so
92 // the caller can fall back to its own retry policy rather
93 // than silently dropping the proposal.
94 {
95 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
96 let Some(node) = topo.get_node(leader_id) else {
97 return Err(crate::error::ClusterError::Transport {
98 detail: format!(
99 "metadata propose forward: leader {leader_id} not in local topology"
100 ),
101 });
102 };
103 let Some(addr) = node.socket_addr() else {
104 return Err(crate::error::ClusterError::Transport {
105 detail: format!(
106 "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
107 node.addr
108 ),
109 });
110 };
111 // Idempotent: register_peer overwrites any prior mapping.
112 self.transport.register_peer(leader_id, addr);
113 }
114
115 let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
116 crate::rpc_codec::MetadataProposeRequest { bytes: data },
117 );
118 let resp = self.transport.send_rpc(leader_id, req).await?;
119 match resp {
120 crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
121 if r.success {
122 Ok(r.log_index)
123 } else if let Some(hint) = r.leader_hint {
124 // The receiving node was also not the leader
125 // (rare: leader changed between our local check
126 // and the forwarded RPC). Surface as NotLeader
127 // so the caller's normal retry path runs.
128 Err(crate::error::ClusterError::Raft(
129 nodedb_raft::RaftError::NotLeader {
130 leader_hint: Some(hint),
131 },
132 ))
133 } else {
134 Err(crate::error::ClusterError::Transport {
135 detail: format!("metadata propose forward failed: {}", r.error_message),
136 })
137 }
138 }
139 other => Err(crate::error::ClusterError::Transport {
140 detail: format!("metadata propose forward: unexpected response variant {other:?}"),
141 }),
142 }
143 }
144
145 /// Propose a command to the data Raft group owning the given vShard,
146 /// transparently forwarding to the group leader if this node is not it.
147 ///
148 /// Tries a local propose first. On `NotLeader { leader_hint: Some(id) }`,
149 /// looks up the hinted leader's address in the cluster topology and sends
150 /// a `DataProposeRequest` over QUIC. The receiving leader applies the
151 /// proposal locally and returns `(group_id, log_index)`.
152 ///
153 /// On `NotLeader { leader_hint: None }` (election in progress) the call
154 /// returns the original `NotLeader` error so the caller can retry.
155 pub async fn propose_via_data_leader(
156 &self,
157 vshard_id: u32,
158 data: Vec<u8>,
159 ) -> Result<(u64, u64)> {
160 // First, try a local propose.
161 match self.propose(vshard_id, data.clone()) {
162 Ok(pair) => Ok(pair),
163 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
164 leader_hint,
165 })) => {
166 let Some(leader_id) = leader_hint else {
167 return Err(crate::error::ClusterError::Raft(
168 nodedb_raft::RaftError::NotLeader { leader_hint: None },
169 ));
170 };
171 if leader_id == self.node_id {
172 return Err(crate::error::ClusterError::Raft(
173 nodedb_raft::RaftError::NotLeader {
174 leader_hint: Some(leader_id),
175 },
176 ));
177 }
178 // Otherwise forward to the hinted leader.
179 self.forward_data_propose(leader_id, vshard_id, data).await
180 }
181 Err(other) => Err(other),
182 }
183 }
184
185 /// Send a `DataProposeRequest` to `leader_id`.
186 async fn forward_data_propose(
187 &self,
188 leader_id: u64,
189 vshard_id: u32,
190 data: Vec<u8>,
191 ) -> Result<(u64, u64)> {
192 {
193 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
194 let Some(node) = topo.get_node(leader_id) else {
195 return Err(crate::error::ClusterError::Transport {
196 detail: format!(
197 "data propose forward: leader {leader_id} not in local topology"
198 ),
199 });
200 };
201 let Some(addr) = node.socket_addr() else {
202 return Err(crate::error::ClusterError::Transport {
203 detail: format!(
204 "data propose forward: leader {leader_id} has unparseable addr {:?}",
205 node.addr
206 ),
207 });
208 };
209 self.transport.register_peer(leader_id, addr);
210 }
211
212 let req =
213 crate::rpc_codec::RaftRpc::DataProposeRequest(crate::rpc_codec::DataProposeRequest {
214 vshard_id,
215 bytes: data,
216 });
217 let resp = self.transport.send_rpc(leader_id, req).await?;
218 match resp {
219 crate::rpc_codec::RaftRpc::DataProposeResponse(r) => {
220 if r.success {
221 Ok((r.group_id, r.log_index))
222 } else if let Some(hint) = r.leader_hint {
223 Err(crate::error::ClusterError::Raft(
224 nodedb_raft::RaftError::NotLeader {
225 leader_hint: Some(hint),
226 },
227 ))
228 } else {
229 Err(crate::error::ClusterError::Transport {
230 detail: format!("data propose forward failed: {}", r.error_message),
231 })
232 }
233 }
234 other => Err(crate::error::ClusterError::Transport {
235 detail: format!("data propose forward: unexpected response variant {other:?}"),
236 }),
237 }
238 }
239
240 /// Propose a configuration change to a Raft group.
241 ///
242 /// Returns `(group_id, log_index)` on success.
243 pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
244 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
245 mr.propose_conf_change(group_id, change)
246 }
247}