Skip to main content

zlayer_consensus/
node.rs

1//! High-level `ConsensusNode` wrapper around `openraft::Raft`.
2//!
3//! Provides a builder pattern for constructing Raft nodes with the desired
4//! storage and network backends, plus convenience methods for common operations.
5
6use std::collections::BTreeMap;
7use std::collections::BTreeSet;
8use std::sync::Arc;
9
10use openraft::impls::OneshotResponder;
11use openraft::{BasicNode, Raft, RaftMetrics, RaftTypeConfig};
12use tracing::info;
13
14use crate::config::ConsensusConfig;
15use crate::error::{ConsensusError, Result};
16use crate::types::NodeId;
17
18/// A high-level wrapper around `openraft::Raft`.
19///
20/// Provides ergonomic methods for common Raft operations:
21/// proposing writes, linearizable reads, cluster membership changes,
22/// and metrics observation.
23///
24/// Construct via [`ConsensusNodeBuilder`].
25///
26/// Note: this requires `C::Responder = OneshotResponder<C>` which is the
27/// default when using `declare_raft_types!`. This enables the blocking
28/// `client_write`, `add_learner`, and `change_membership` APIs.
29pub struct ConsensusNode<C>
30where
31    C: RaftTypeConfig<NodeId = NodeId, Node = BasicNode, Responder = OneshotResponder<C>>,
32{
33    /// The inner openraft `Raft` instance.
34    raft: Raft<C>,
35    /// This node's ID.
36    node_id: NodeId,
37    /// This node's advertised address.
38    address: String,
39}
40
41impl<C> ConsensusNode<C>
42where
43    C: RaftTypeConfig<NodeId = NodeId, Node = BasicNode, Responder = OneshotResponder<C>>,
44{
45    /// Create a `ConsensusNode` directly from an already-constructed `Raft` instance.
46    #[must_use]
47    pub fn from_raft(raft: Raft<C>, node_id: NodeId, address: String) -> Self {
48        Self {
49            raft,
50            node_id,
51            address,
52        }
53    }
54
55    /// This node's ID.
56    #[must_use]
57    pub fn node_id(&self) -> NodeId {
58        self.node_id
59    }
60
61    /// This node's address.
62    #[must_use]
63    pub fn address(&self) -> &str {
64        &self.address
65    }
66
67    /// Get a reference to the inner `Raft` instance for advanced usage.
68    #[must_use]
69    pub fn raft(&self) -> &Raft<C> {
70        &self.raft
71    }
72
73    /// Get a clone of the inner `Raft` instance (cheap, Arc-based).
74    #[must_use]
75    pub fn raft_clone(&self) -> Raft<C> {
76        self.raft.clone()
77    }
78
79    /// Propose a client write to the Raft cluster.
80    ///
81    /// This node must be the leader. The write is replicated to a quorum
82    /// before the response is returned.
83    ///
84    /// # Errors
85    /// Returns `ConsensusError::Write` if the write fails (e.g., not the leader).
86    pub async fn propose(&self, request: C::D) -> Result<C::R> {
87        let result = self
88            .raft
89            .client_write(request)
90            .await
91            .map_err(|e| ConsensusError::Write(e.to_string()))?;
92
93        Ok(result.data)
94    }
95
96    /// Ensure linearizable reads by confirming this node is still the leader.
97    ///
98    /// Call this before reading from the state machine to guarantee that the
99    /// data is not stale. Implements the "leader lease read" pattern.
100    ///
101    /// # Errors
102    /// Returns `ConsensusError::Write` if the linearizable check fails.
103    pub async fn ensure_linearizable(&self) -> Result<()> {
104        self.raft
105            .ensure_linearizable()
106            .await
107            .map_err(|e| ConsensusError::Write(format!("linearizable check failed: {e}")))?;
108        Ok(())
109    }
110
111    /// Check if this node is the current leader.
112    #[must_use]
113    pub fn is_leader(&self) -> bool {
114        let metrics = self.raft.metrics().borrow().clone();
115        metrics.current_leader == Some(self.node_id)
116    }
117
118    /// Get the current leader's node ID, if known.
119    #[must_use]
120    pub fn leader_id(&self) -> Option<NodeId> {
121        self.raft.metrics().borrow().current_leader
122    }
123
124    /// Returns the set of current voter node IDs.
125    #[must_use]
126    pub fn voter_ids(&self) -> BTreeSet<NodeId> {
127        let metrics = self.raft.metrics().borrow().clone();
128        metrics.membership_config.membership().voter_ids().collect()
129    }
130
131    /// Returns the number of current voters.
132    #[must_use]
133    pub fn voter_count(&self) -> usize {
134        self.voter_ids().len()
135    }
136
137    /// Returns the set of current learner node IDs (non-voters).
138    #[must_use]
139    pub fn learner_ids(&self) -> BTreeSet<NodeId> {
140        let metrics = self.raft.metrics().borrow().clone();
141        metrics
142            .membership_config
143            .membership()
144            .learner_ids()
145            .collect()
146    }
147
148    /// Returns all member node IDs (voters + learners).
149    #[must_use]
150    pub fn all_member_ids(&self) -> BTreeSet<NodeId> {
151        let metrics = self.raft.metrics().borrow().clone();
152        let membership = metrics.membership_config.membership();
153        let mut ids: BTreeSet<NodeId> = membership.voter_ids().collect();
154        ids.extend(membership.learner_ids());
155        ids
156    }
157
158    /// Bootstrap a new single-node cluster.
159    ///
160    /// This must only be called once, on the first node, when creating a new cluster.
161    ///
162    /// # Errors
163    /// Returns `ConsensusError::Init` if bootstrap initialization fails.
164    pub async fn bootstrap(&self) -> Result<()> {
165        let mut members = BTreeMap::new();
166        members.insert(
167            self.node_id,
168            BasicNode {
169                addr: self.address.clone(),
170            },
171        );
172
173        self.raft
174            .initialize(members)
175            .await
176            .map_err(|e| ConsensusError::Init(format!("bootstrap failed: {e}")))?;
177
178        info!(node_id = self.node_id, "Bootstrapped single-node cluster");
179        Ok(())
180    }
181
182    /// Add a learner to the cluster.
183    ///
184    /// A learner receives log entries but does not vote. Use this to
185    /// pre-sync a node before promoting it to a voter.
186    ///
187    /// If `blocking` is true, waits until the learner has caught up with the log.
188    ///
189    /// # Errors
190    /// Returns `ConsensusError::Membership` if the learner cannot be added.
191    pub async fn add_learner(
192        &self,
193        node_id: NodeId,
194        address: String,
195        blocking: bool,
196    ) -> Result<()> {
197        let node = BasicNode { addr: address };
198
199        self.raft
200            .add_learner(node_id, node, blocking)
201            .await
202            .map_err(|e| {
203                ConsensusError::Membership(format!("add_learner({node_id}) failed: {e}"))
204            })?;
205
206        info!(node_id, "Added learner");
207        Ok(())
208    }
209
210    /// Change the cluster membership (promote learners to voters, or remove members).
211    ///
212    /// Pass the complete set of voter node IDs. Nodes not in the set will be
213    /// demoted or removed.
214    ///
215    /// If `retain` is true, nodes not in `voter_ids` are kept as learners
216    /// rather than removed entirely.
217    ///
218    /// # Errors
219    /// Returns `ConsensusError::Membership` if the membership change fails.
220    pub async fn change_membership(&self, voter_ids: BTreeSet<NodeId>, retain: bool) -> Result<()> {
221        self.raft
222            .change_membership(voter_ids, retain)
223            .await
224            .map_err(|e| ConsensusError::Membership(format!("change_membership failed: {e}")))?;
225
226        info!("Membership change committed");
227        Ok(())
228    }
229
230    /// Convenience: add a node as learner then promote it to voter.
231    ///
232    /// This performs the full two-step process:
233    /// 1. Add as learner (blocking, waits for log sync)
234    /// 2. Change membership to include the new voter
235    ///
236    /// # Errors
237    /// Returns a `ConsensusError` if either the learner addition or membership change fails.
238    pub async fn add_voter(&self, node_id: NodeId, address: String) -> Result<()> {
239        // Step 1: add as learner
240        self.add_learner(node_id, address, true).await?;
241
242        // Step 2: collect current voters and add the new one
243        let metrics = self.raft.metrics().borrow().clone();
244        let mut voter_ids = BTreeSet::new();
245
246        if let Some(membership) = metrics
247            .membership_config
248            .membership()
249            .get_joint_config()
250            .last()
251        {
252            for id in membership {
253                voter_ids.insert(*id);
254            }
255        }
256        voter_ids.insert(self.node_id);
257        voter_ids.insert(node_id);
258
259        self.change_membership(voter_ids, false).await?;
260
261        info!(node_id, "Promoted learner to voter");
262        Ok(())
263    }
264
265    /// Get current Raft metrics (leader, term, log indices, etc.).
266    #[must_use]
267    pub fn metrics(&self) -> RaftMetrics<NodeId, BasicNode> {
268        self.raft.metrics().borrow().clone()
269    }
270
271    /// Gracefully shut down the Raft node.
272    ///
273    /// # Errors
274    /// Returns `ConsensusError::Fatal` if shutdown fails.
275    pub async fn shutdown(&self) -> Result<()> {
276        self.raft
277            .shutdown()
278            .await
279            .map_err(|e| ConsensusError::Fatal(format!("shutdown failed: {e}")))?;
280
281        info!(node_id = self.node_id, "Raft node shut down");
282        Ok(())
283    }
284}
285
286// ---------------------------------------------------------------------------
287// Builder
288// ---------------------------------------------------------------------------
289
290/// Builder for constructing a `ConsensusNode`.
291///
292/// # Example
293///
294/// ```ignore
295/// use zlayer_consensus::ConsensusNodeBuilder;
296///
297/// let node = ConsensusNodeBuilder::new(1, "127.0.0.1:9000".into())
298///     .with_config(ConsensusConfig::default())
299///     .build_with(log_store, state_machine, network)
300///     .await?;
301/// ```
302pub struct ConsensusNodeBuilder {
303    node_id: NodeId,
304    address: String,
305    config: ConsensusConfig,
306}
307
308impl ConsensusNodeBuilder {
309    /// Create a new builder for the given node ID and address.
310    #[must_use]
311    pub fn new(node_id: NodeId, address: String) -> Self {
312        Self {
313            node_id,
314            address,
315            config: ConsensusConfig::default(),
316        }
317    }
318
319    /// Set the consensus configuration.
320    #[must_use]
321    pub fn with_config(mut self, config: ConsensusConfig) -> Self {
322        self.config = config;
323        self
324    }
325
326    /// Build the `ConsensusNode` with the provided storage and network implementations.
327    ///
328    /// This is the most flexible build method -- you provide all three components.
329    ///
330    /// # Errors
331    /// Returns `ConsensusError::Fatal` if the Raft instance fails to start, or
332    /// a configuration error if the consensus config is invalid.
333    pub async fn build_with<C, LS, SM, N>(
334        self,
335        log_store: LS,
336        state_machine: SM,
337        network: N,
338    ) -> Result<ConsensusNode<C>>
339    where
340        C: RaftTypeConfig<NodeId = NodeId, Node = BasicNode, Responder = OneshotResponder<C>>,
341        LS: openraft::storage::RaftLogStorage<C>,
342        SM: openraft::storage::RaftStateMachine<C>,
343        N: openraft::network::RaftNetworkFactory<C>,
344    {
345        let raft_config = Arc::new(self.config.to_openraft_config()?);
346
347        let raft = Raft::new(self.node_id, raft_config, network, log_store, state_machine)
348            .await
349            .map_err(|e| ConsensusError::Fatal(format!("Failed to create Raft: {e}")))?;
350
351        info!(
352            node_id = self.node_id,
353            address = %self.address,
354            "Created ConsensusNode"
355        );
356
357        Ok(ConsensusNode {
358            raft,
359            node_id: self.node_id,
360            address: self.address,
361        })
362    }
363}