Skip to main content

nodedb_cluster/raft_loop/
proposals.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Raft proposal API — local and leader-forwarded proposals for both
4//! the metadata group (group 0) and data groups.
5
6use crate::conf_change::ConfChange;
7use crate::error::Result;
8
9use super::loop_core::{CommitApplier, RaftLoop};
10use crate::forward::PlanExecutor;
11
12impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
13    /// Propose a command to the Raft group owning the given vShard.
14    ///
15    /// Returns `(group_id, log_index)` on success.
16    pub fn propose(&self, vshard_id: u32, data: Vec<u8>) -> Result<(u64, u64)> {
17        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
18        mr.propose(vshard_id, data)
19    }
20
21    /// Propose a command directly to the metadata Raft group (group 0).
22    ///
23    /// Used by the host crate's metadata proposer and by integration
24    /// tests that exercise the replicated-catalog path without a
25    /// pgwire client. Fails with `ClusterError::GroupNotFound` if
26    /// group 0 does not exist on this node, and with
27    /// `ClusterError::Raft(NotLeader)` if this node is not the
28    /// current leader of group 0.
29    pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
30        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
31        mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
32    }
33
34    /// Propose to the metadata Raft group, transparently forwarding
35    /// to the current leader if this node is not it.
36    ///
37    /// Tries a local propose first. On
38    /// `ClusterError::Raft(NotLeader { leader_hint })`, looks up the
39    /// hinted leader's address in cluster topology and sends a
40    /// [`crate::rpc_codec::MetadataProposeRequest`] over QUIC. The
41    /// receiving leader applies the proposal locally and returns
42    /// the log index.
43    ///
44    /// On `NotLeader { leader_hint: None }` (election in progress,
45    /// no observed leader yet) the call returns the original
46    /// `NotLeader` error so the caller can decide whether to retry.
47    /// We deliberately do not implement a wait-and-retry loop here
48    /// because the caller (the host-side proposer) may have a
49    /// shorter deadline than any reasonable retry budget.
50    ///
51    /// The leader-side path through this function is identical to
52    /// the bare `propose_to_metadata_group` — the only extra cost is
53    /// an `is_leader_locally` check before the local propose.
54    pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
55        // First, try a local propose.
56        match self.propose_to_metadata_group(data.clone()) {
57            Ok(idx) => Ok(idx),
58            Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
59                leader_hint,
60            })) => {
61                let Some(leader_id) = leader_hint else {
62                    return Err(crate::error::ClusterError::Raft(
63                        nodedb_raft::RaftError::NotLeader { leader_hint: None },
64                    ));
65                };
66                if leader_id == self.node_id {
67                    // Should not happen — local propose said we
68                    // weren't leader but the hint points at us. Fall
69                    // through to the original error so the caller
70                    // sees the contradiction.
71                    return Err(crate::error::ClusterError::Raft(
72                        nodedb_raft::RaftError::NotLeader {
73                            leader_hint: Some(leader_id),
74                        },
75                    ));
76                }
77                // Otherwise forward to the hinted leader.
78                self.forward_metadata_propose(leader_id, data).await
79            }
80            Err(other) => Err(other),
81        }
82    }
83
84    /// Send a `MetadataProposeRequest` to `leader_id`. Looks up the
85    /// leader's listen address via the local topology snapshot and
86    /// dispatches through the existing peer transport.
87    async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
88        // Resolve and register the leader's address with the
89        // transport so `send_rpc` has a destination. Topology is
90        // updated by the membership / health subsystem; if the
91        // leader isn't in our local topology yet we fail loudly so
92        // the caller can fall back to its own retry policy rather
93        // than silently dropping the proposal.
94        {
95            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
96            let Some(node) = topo.get_node(leader_id) else {
97                return Err(crate::error::ClusterError::Transport {
98                    detail: format!(
99                        "metadata propose forward: leader {leader_id} not in local topology"
100                    ),
101                });
102            };
103            let Some(addr) = node.socket_addr() else {
104                return Err(crate::error::ClusterError::Transport {
105                    detail: format!(
106                        "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
107                        node.addr
108                    ),
109                });
110            };
111            // Idempotent: register_peer overwrites any prior mapping.
112            self.transport.register_peer(leader_id, addr);
113        }
114
115        let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
116            crate::rpc_codec::MetadataProposeRequest { bytes: data },
117        );
118        let resp = self.transport.send_rpc(leader_id, req).await?;
119        match resp {
120            crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
121                if r.success {
122                    Ok(r.log_index)
123                } else if let Some(hint) = r.leader_hint {
124                    // The receiving node was also not the leader
125                    // (rare: leader changed between our local check
126                    // and the forwarded RPC). Surface as NotLeader
127                    // so the caller's normal retry path runs.
128                    Err(crate::error::ClusterError::Raft(
129                        nodedb_raft::RaftError::NotLeader {
130                            leader_hint: Some(hint),
131                        },
132                    ))
133                } else {
134                    Err(crate::error::ClusterError::Transport {
135                        detail: format!("metadata propose forward failed: {}", r.error_message),
136                    })
137                }
138            }
139            other => Err(crate::error::ClusterError::Transport {
140                detail: format!("metadata propose forward: unexpected response variant {other:?}"),
141            }),
142        }
143    }
144
145    /// Propose a command to the data Raft group owning the given vShard,
146    /// transparently forwarding to the group leader if this node is not it.
147    ///
148    /// Tries a local propose first. On `NotLeader { leader_hint: Some(id) }`,
149    /// looks up the hinted leader's address in the cluster topology and sends
150    /// a `DataProposeRequest` over QUIC. The receiving leader applies the
151    /// proposal locally and returns `(group_id, log_index)`.
152    ///
153    /// On `NotLeader { leader_hint: None }` (election in progress) the call
154    /// returns the original `NotLeader` error so the caller can retry.
155    pub async fn propose_via_data_leader(
156        &self,
157        vshard_id: u32,
158        data: Vec<u8>,
159    ) -> Result<(u64, u64)> {
160        // First, try a local propose.
161        match self.propose(vshard_id, data.clone()) {
162            Ok(pair) => Ok(pair),
163            Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
164                leader_hint,
165            })) => {
166                let Some(leader_id) = leader_hint else {
167                    return Err(crate::error::ClusterError::Raft(
168                        nodedb_raft::RaftError::NotLeader { leader_hint: None },
169                    ));
170                };
171                if leader_id == self.node_id {
172                    return Err(crate::error::ClusterError::Raft(
173                        nodedb_raft::RaftError::NotLeader {
174                            leader_hint: Some(leader_id),
175                        },
176                    ));
177                }
178                // Otherwise forward to the hinted leader.
179                self.forward_data_propose(leader_id, vshard_id, data).await
180            }
181            Err(other) => Err(other),
182        }
183    }
184
185    /// Send a `DataProposeRequest` to `leader_id`.
186    async fn forward_data_propose(
187        &self,
188        leader_id: u64,
189        vshard_id: u32,
190        data: Vec<u8>,
191    ) -> Result<(u64, u64)> {
192        {
193            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
194            let Some(node) = topo.get_node(leader_id) else {
195                return Err(crate::error::ClusterError::Transport {
196                    detail: format!(
197                        "data propose forward: leader {leader_id} not in local topology"
198                    ),
199                });
200            };
201            let Some(addr) = node.socket_addr() else {
202                return Err(crate::error::ClusterError::Transport {
203                    detail: format!(
204                        "data propose forward: leader {leader_id} has unparseable addr {:?}",
205                        node.addr
206                    ),
207                });
208            };
209            self.transport.register_peer(leader_id, addr);
210        }
211
212        let req =
213            crate::rpc_codec::RaftRpc::DataProposeRequest(crate::rpc_codec::DataProposeRequest {
214                vshard_id,
215                bytes: data,
216            });
217        let resp = self.transport.send_rpc(leader_id, req).await?;
218        match resp {
219            crate::rpc_codec::RaftRpc::DataProposeResponse(r) => {
220                if r.success {
221                    Ok((r.group_id, r.log_index))
222                } else if let Some(hint) = r.leader_hint {
223                    Err(crate::error::ClusterError::Raft(
224                        nodedb_raft::RaftError::NotLeader {
225                            leader_hint: Some(hint),
226                        },
227                    ))
228                } else {
229                    Err(crate::error::ClusterError::Transport {
230                        detail: format!("data propose forward failed: {}", r.error_message),
231                    })
232                }
233            }
234            other => Err(crate::error::ClusterError::Transport {
235                detail: format!("data propose forward: unexpected response variant {other:?}"),
236            }),
237        }
238    }
239
240    /// Propose a configuration change to a Raft group.
241    ///
242    /// Returns `(group_id, log_index)` on success.
243    pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
244        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
245        mr.propose_conf_change(group_id, change)
246    }
247}