dgate/cluster/raft/
mod.rs

1//! Raft consensus implementation for DGate
2//!
3//! This module provides a full Raft consensus implementation using openraft,
4//! with leader election, log replication, and snapshot transfer.
5//!
6//! # Architecture
7//!
8//! - `RaftConsensus`: Main Raft wrapper implementing the Consensus trait
9//! - `RaftLogStore`: In-memory log storage
10//! - `RaftStateMachine`: State machine for applying logs
11//! - `RaftNetwork`: HTTP-based networking between nodes
12
13mod network;
14mod state_machine;
15mod store;
16
17use std::collections::BTreeMap;
18use std::collections::BTreeSet;
19use std::io::Cursor;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use openraft::raft::ClientWriteResponse;
25use openraft::Config as RaftConfig;
26use openraft::{BasicNode, ChangeMembers, Raft};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{mpsc, RwLock};
29use tracing::{debug, info, warn};
30
31use super::consensus::{Consensus, ConsensusMetrics, ConsensusResponse, NodeId, NodeState};
32use super::discovery::NodeDiscovery;
33use crate::config::{ClusterConfig, ClusterMember, ClusterMode};
34use crate::resources::ChangeLog;
35use crate::storage::ProxyStore;
36
37pub use network::NetworkFactory;
38pub use state_machine::DGateStateMachine;
39pub use store::RaftLogStore;
40
41// Raft type configuration using openraft's declarative macro
42openraft::declare_raft_types!(
43    pub TypeConfig:
44        D = ChangeLog,
45        R = RaftClientResponse,
46        Node = BasicNode,
47);
48
49/// Response from the Raft state machine after applying a log entry
50#[derive(Debug, Clone, Serialize, Deserialize, Default)]
51pub struct RaftClientResponse {
52    pub success: bool,
53    pub message: Option<String>,
54}
55
56impl From<RaftClientResponse> for ConsensusResponse {
57    fn from(r: RaftClientResponse) -> Self {
58        ConsensusResponse {
59            success: r.success,
60            message: r.message,
61        }
62    }
63}
64
65/// Snapshot data for state machine
66#[derive(Debug, Clone, Serialize, Deserialize, Default)]
67pub struct SnapshotData {
68    pub changelogs: Vec<ChangeLog>,
69}
70
71/// The Raft instance type alias for convenience
72pub type DGateRaft = Raft<TypeConfig>;
73
74/// Raft consensus implementation
75pub struct RaftConsensus {
76    /// Configuration
77    config: ClusterConfig,
78    /// The Raft instance
79    raft: Arc<DGateRaft>,
80    /// Node discovery service
81    discovery: Option<Arc<NodeDiscovery>>,
82    /// Cached members list
83    cached_members: RwLock<Vec<ClusterMember>>,
84}
85
86impl RaftConsensus {
87    /// Create a new Raft consensus manager
88    pub async fn new(
89        cluster_config: ClusterConfig,
90        store: Arc<ProxyStore>,
91        change_tx: mpsc::UnboundedSender<ChangeLog>,
92    ) -> anyhow::Result<Self> {
93        let node_id = cluster_config.node_id;
94
95        info!(
96            "Creating Raft consensus for node {} at {}",
97            node_id, cluster_config.advertise_addr
98        );
99
100        // Create Raft configuration
101        let raft_config = RaftConfig {
102            cluster_name: "dgate-cluster".to_string(),
103            heartbeat_interval: 200,
104            election_timeout_min: 500,
105            election_timeout_max: 1000,
106            snapshot_policy: openraft::SnapshotPolicy::LogsSinceLast(1000),
107            max_in_snapshot_log_to_keep: 100,
108            ..Default::default()
109        };
110
111        let raft_config = Arc::new(raft_config.validate()?);
112
113        // Create log store
114        let log_store = RaftLogStore::new();
115
116        // Create network factory
117        let network_factory = NetworkFactory::new();
118
119        // Create state machine
120        let state_machine = DGateStateMachine::with_change_notifier(store, change_tx);
121
122        // Create the Raft instance
123        let raft = Raft::new(
124            node_id,
125            raft_config,
126            network_factory,
127            log_store,
128            state_machine,
129        )
130        .await?;
131
132        let raft = Arc::new(raft);
133
134        // Setup discovery if configured
135        let discovery = cluster_config
136            .discovery
137            .as_ref()
138            .map(|disc_config| Arc::new(NodeDiscovery::new(disc_config.clone())));
139
140        // Cache initial members from config
141        let cached_members = RwLock::new(cluster_config.initial_members.clone());
142
143        Ok(Self {
144            config: cluster_config,
145            raft,
146            discovery,
147            cached_members,
148        })
149    }
150
151    /// Get the Raft instance for direct access (e.g., for handling RPC requests)
152    pub fn raft(&self) -> &Arc<DGateRaft> {
153        &self.raft
154    }
155
156    /// Bootstrap this node as a single-node cluster
157    async fn bootstrap_single_node(&self) -> anyhow::Result<()> {
158        let node_id = self.config.node_id;
159        let mut members = BTreeMap::new();
160        members.insert(
161            node_id,
162            BasicNode {
163                addr: self.config.advertise_addr.clone(),
164            },
165        );
166
167        match self.raft.initialize(members).await {
168            Ok(_) => {
169                info!("Successfully bootstrapped single-node Raft cluster");
170                Ok(())
171            }
172            Err(e) => {
173                if e.to_string().contains("already initialized") {
174                    debug!("Raft cluster already initialized");
175                    Ok(())
176                } else {
177                    Err(e.into())
178                }
179            }
180        }
181    }
182
183    /// Bootstrap as the initial leader
184    ///
185    /// When bootstrapping, we include ALL initial members in the initial cluster.
186    /// This avoids the complexity and race conditions of adding nodes one by one.
187    async fn bootstrap_cluster(&self) -> anyhow::Result<()> {
188        let node_id = self.config.node_id;
189        let mut members = BTreeMap::new();
190
191        // Add this node first
192        members.insert(
193            node_id,
194            BasicNode {
195                addr: self.config.advertise_addr.clone(),
196            },
197        );
198
199        // Add all other initial members
200        for member in &self.config.initial_members {
201            if member.id != node_id {
202                members.insert(
203                    member.id,
204                    BasicNode {
205                        addr: member.addr.clone(),
206                    },
207                );
208            }
209        }
210
211        info!(
212            "Bootstrapping Raft cluster with {} members (node {} as leader)",
213            members.len(),
214            node_id
215        );
216
217        match self.raft.initialize(members).await {
218            Ok(_) => {
219                info!("Successfully bootstrapped Raft cluster");
220                Ok(())
221            }
222            Err(e) => {
223                if e.to_string().contains("already initialized") {
224                    debug!("Raft cluster already initialized");
225                    Ok(())
226                } else {
227                    Err(e.into())
228                }
229            }
230        }
231    }
232
233    /// Join an existing cluster
234    ///
235    /// Since the bootstrap node initializes the cluster with all members,
236    /// non-bootstrap nodes just need to wait for the leader to replicate to them.
237    /// The Raft protocol will handle the log synchronization automatically.
238    async fn join_cluster(&self) -> anyhow::Result<()> {
239        let node_id = self.config.node_id;
240        let my_addr = &self.config.advertise_addr;
241
242        info!(
243            "Node {} at {} waiting to receive Raft replication from leader",
244            node_id, my_addr
245        );
246
247        // The node is already included in the initial membership by the bootstrap node.
248        // Raft will automatically handle log replication once the leader connects to us.
249        // We just need to be ready to receive append_entries RPCs.
250
251        Ok(())
252    }
253
254    /// Discovery loop that periodically checks for new nodes
255    async fn run_discovery_loop(
256        discovery: Arc<NodeDiscovery>,
257        raft: Arc<DGateRaft>,
258        my_node_id: NodeId,
259    ) {
260        loop {
261            let nodes = discovery.discover().await;
262            for (node_id, node) in nodes {
263                let metrics = raft.metrics().borrow().clone();
264                if metrics.current_leader == Some(my_node_id) {
265                    // Step 1: Add as learner
266                    let add_learner =
267                        ChangeMembers::AddNodes([(node_id, node.clone())].into_iter().collect());
268
269                    match raft.change_membership(add_learner, false).await {
270                        Ok(_) => {
271                            info!("Added discovered node {} as learner", node_id);
272                            // Step 2: Promote to voter
273                            let mut voter_ids = BTreeSet::new();
274                            voter_ids.insert(node_id);
275                            let promote = ChangeMembers::AddVoterIds(voter_ids);
276                            match raft.change_membership(promote, false).await {
277                                Ok(_) => info!("Promoted discovered node {} to voter", node_id),
278                                Err(e) => debug!("Could not promote node {}: {}", node_id, e),
279                            }
280                        }
281                        Err(e) => debug!("Could not add node {}: {}", node_id, e),
282                    }
283                }
284            }
285
286            tokio::time::sleep(Duration::from_secs(10)).await;
287        }
288    }
289}
290
291#[async_trait]
292impl Consensus for RaftConsensus {
293    fn node_id(&self) -> NodeId {
294        self.config.node_id
295    }
296
297    fn mode(&self) -> ClusterMode {
298        self.config.mode
299    }
300
301    async fn initialize(&self) -> anyhow::Result<()> {
302        let node_id = self.config.node_id;
303
304        if self.config.bootstrap {
305            info!(
306                "Bootstrapping Raft cluster with node_id={} as initial leader",
307                node_id
308            );
309            self.bootstrap_cluster().await?;
310        } else if !self.config.initial_members.is_empty() {
311            info!(
312                "Joining existing Raft cluster with {} known members",
313                self.config.initial_members.len()
314            );
315            self.join_cluster().await?;
316        } else {
317            warn!("No bootstrap flag and no initial members - starting as isolated node");
318            self.bootstrap_single_node().await?;
319        }
320
321        // Start discovery background task if configured
322        if let Some(ref discovery) = self.discovery {
323            let discovery_clone = discovery.clone();
324            let raft_clone = self.raft.clone();
325            let my_node_id = self.config.node_id;
326            tokio::spawn(async move {
327                Self::run_discovery_loop(discovery_clone, raft_clone, my_node_id).await;
328            });
329        }
330
331        Ok(())
332    }
333
334    async fn can_write(&self) -> bool {
335        // In Raft, only the leader can accept writes
336        let metrics = self.raft.metrics().borrow().clone();
337        metrics.current_leader == Some(self.config.node_id)
338    }
339
340    async fn leader_id(&self) -> Option<NodeId> {
341        self.raft.metrics().borrow().current_leader
342    }
343
344    async fn propose(&self, changelog: ChangeLog) -> anyhow::Result<ConsensusResponse> {
345        let result: ClientWriteResponse<TypeConfig> = self
346            .raft
347            .client_write(changelog)
348            .await
349            .map_err(|e| anyhow::anyhow!("Raft write failed: {}", e))?;
350
351        Ok(result.data.into())
352    }
353
354    async fn metrics(&self) -> ConsensusMetrics {
355        let raft_metrics = self.raft.metrics().borrow().clone();
356        let members = self.cached_members.read().await.clone();
357
358        let state = match raft_metrics.state {
359            openraft::ServerState::Leader => NodeState::Leader,
360            openraft::ServerState::Follower => NodeState::Follower,
361            openraft::ServerState::Candidate => NodeState::Candidate,
362            openraft::ServerState::Learner => NodeState::Learner,
363            openraft::ServerState::Shutdown => NodeState::Shutdown,
364        };
365
366        ConsensusMetrics {
367            id: self.config.node_id,
368            mode: self.config.mode,
369            can_write: raft_metrics.current_leader == Some(self.config.node_id),
370            leader_id: raft_metrics.current_leader,
371            state,
372            current_term: Some(raft_metrics.vote.leader_id().term),
373            last_applied: raft_metrics.last_applied.map(|l| l.index),
374            committed: raft_metrics.last_applied.map(|l| l.index),
375            members,
376            extra: None,
377        }
378    }
379
380    async fn add_node(&self, node_id: NodeId, addr: String) -> anyhow::Result<()> {
381        info!("Adding node {} at {} to Raft cluster", node_id, addr);
382
383        // Step 1: Add the node as a learner first
384        let node = BasicNode { addr: addr.clone() };
385        let add_learner = ChangeMembers::AddNodes([(node_id, node)].into_iter().collect());
386
387        self.raft
388            .change_membership(add_learner, false)
389            .await
390            .map_err(|e| anyhow::anyhow!("Failed to add node as learner: {}", e))?;
391
392        info!("Node {} added as learner, now promoting to voter", node_id);
393
394        // Step 2: Promote the learner to a voter
395        let mut voter_ids = BTreeSet::new();
396        voter_ids.insert(node_id);
397        let promote_voter = ChangeMembers::AddVoterIds(voter_ids);
398
399        self.raft
400            .change_membership(promote_voter, false)
401            .await
402            .map_err(|e| anyhow::anyhow!("Failed to promote node to voter: {}", e))?;
403
404        info!("Node {} successfully promoted to voter", node_id);
405
406        // Update cached members
407        let mut cached = self.cached_members.write().await;
408        if !cached.iter().any(|m| m.id == node_id) {
409            cached.push(ClusterMember {
410                id: node_id,
411                addr,
412                admin_port: None,
413                tls: false,
414            });
415        }
416
417        Ok(())
418    }
419
420    async fn remove_node(&self, node_id: NodeId) -> anyhow::Result<()> {
421        info!("Removing node {} from Raft cluster", node_id);
422
423        let mut remove_set = BTreeSet::new();
424        remove_set.insert(node_id);
425
426        let change = ChangeMembers::RemoveNodes(remove_set);
427
428        self.raft
429            .change_membership(change, false)
430            .await
431            .map_err(|e| anyhow::anyhow!("Failed to remove node: {}", e))?;
432
433        // Update cached members
434        let mut cached = self.cached_members.write().await;
435        cached.retain(|m| m.id != node_id);
436
437        Ok(())
438    }
439
440    async fn members(&self) -> Vec<ClusterMember> {
441        self.cached_members.read().await.clone()
442    }
443
444    async fn shutdown(&self) -> anyhow::Result<()> {
445        info!("Shutting down Raft consensus");
446        self.raft
447            .shutdown()
448            .await
449            .map_err(|e| anyhow::anyhow!("Shutdown failed: {}", e))?;
450        Ok(())
451    }
452}