1mod network;
14mod state_machine;
15mod store;
16
17use std::collections::BTreeMap;
18use std::collections::BTreeSet;
19use std::io::Cursor;
20use std::sync::Arc;
21use std::time::Duration;
22
23use async_trait::async_trait;
24use openraft::raft::ClientWriteResponse;
25use openraft::Config as RaftConfig;
26use openraft::{BasicNode, ChangeMembers, Raft};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{mpsc, RwLock};
29use tracing::{debug, info, warn};
30
31use super::consensus::{Consensus, ConsensusMetrics, ConsensusResponse, NodeId, NodeState};
32use super::discovery::NodeDiscovery;
33use crate::config::{ClusterConfig, ClusterMember, ClusterMode};
34use crate::resources::ChangeLog;
35use crate::storage::ProxyStore;
36
37pub use network::NetworkFactory;
38pub use state_machine::DGateStateMachine;
39pub use store::RaftLogStore;
40
41openraft::declare_raft_types!(
43 pub TypeConfig:
44 D = ChangeLog,
45 R = RaftClientResponse,
46 Node = BasicNode,
47);
48
49#[derive(Debug, Clone, Serialize, Deserialize, Default)]
51pub struct RaftClientResponse {
52 pub success: bool,
53 pub message: Option<String>,
54}
55
56impl From<RaftClientResponse> for ConsensusResponse {
57 fn from(r: RaftClientResponse) -> Self {
58 ConsensusResponse {
59 success: r.success,
60 message: r.message,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, Default)]
67pub struct SnapshotData {
68 pub changelogs: Vec<ChangeLog>,
69}
70
71pub type DGateRaft = Raft<TypeConfig>;
73
74pub struct RaftConsensus {
76 config: ClusterConfig,
78 raft: Arc<DGateRaft>,
80 discovery: Option<Arc<NodeDiscovery>>,
82 cached_members: RwLock<Vec<ClusterMember>>,
84}
85
86impl RaftConsensus {
87 pub async fn new(
89 cluster_config: ClusterConfig,
90 store: Arc<ProxyStore>,
91 change_tx: mpsc::UnboundedSender<ChangeLog>,
92 ) -> anyhow::Result<Self> {
93 let node_id = cluster_config.node_id;
94
95 info!(
96 "Creating Raft consensus for node {} at {}",
97 node_id, cluster_config.advertise_addr
98 );
99
100 let raft_config = RaftConfig {
102 cluster_name: "dgate-cluster".to_string(),
103 heartbeat_interval: 200,
104 election_timeout_min: 500,
105 election_timeout_max: 1000,
106 snapshot_policy: openraft::SnapshotPolicy::LogsSinceLast(1000),
107 max_in_snapshot_log_to_keep: 100,
108 ..Default::default()
109 };
110
111 let raft_config = Arc::new(raft_config.validate()?);
112
113 let log_store = RaftLogStore::new();
115
116 let network_factory = NetworkFactory::new();
118
119 let state_machine = DGateStateMachine::with_change_notifier(store, change_tx);
121
122 let raft = Raft::new(
124 node_id,
125 raft_config,
126 network_factory,
127 log_store,
128 state_machine,
129 )
130 .await?;
131
132 let raft = Arc::new(raft);
133
134 let discovery = cluster_config
136 .discovery
137 .as_ref()
138 .map(|disc_config| Arc::new(NodeDiscovery::new(disc_config.clone())));
139
140 let cached_members = RwLock::new(cluster_config.initial_members.clone());
142
143 Ok(Self {
144 config: cluster_config,
145 raft,
146 discovery,
147 cached_members,
148 })
149 }
150
151 pub fn raft(&self) -> &Arc<DGateRaft> {
153 &self.raft
154 }
155
156 async fn bootstrap_single_node(&self) -> anyhow::Result<()> {
158 let node_id = self.config.node_id;
159 let mut members = BTreeMap::new();
160 members.insert(
161 node_id,
162 BasicNode {
163 addr: self.config.advertise_addr.clone(),
164 },
165 );
166
167 match self.raft.initialize(members).await {
168 Ok(_) => {
169 info!("Successfully bootstrapped single-node Raft cluster");
170 Ok(())
171 }
172 Err(e) => {
173 if e.to_string().contains("already initialized") {
174 debug!("Raft cluster already initialized");
175 Ok(())
176 } else {
177 Err(e.into())
178 }
179 }
180 }
181 }
182
183 async fn bootstrap_cluster(&self) -> anyhow::Result<()> {
188 let node_id = self.config.node_id;
189 let mut members = BTreeMap::new();
190
191 members.insert(
193 node_id,
194 BasicNode {
195 addr: self.config.advertise_addr.clone(),
196 },
197 );
198
199 for member in &self.config.initial_members {
201 if member.id != node_id {
202 members.insert(
203 member.id,
204 BasicNode {
205 addr: member.addr.clone(),
206 },
207 );
208 }
209 }
210
211 info!(
212 "Bootstrapping Raft cluster with {} members (node {} as leader)",
213 members.len(),
214 node_id
215 );
216
217 match self.raft.initialize(members).await {
218 Ok(_) => {
219 info!("Successfully bootstrapped Raft cluster");
220 Ok(())
221 }
222 Err(e) => {
223 if e.to_string().contains("already initialized") {
224 debug!("Raft cluster already initialized");
225 Ok(())
226 } else {
227 Err(e.into())
228 }
229 }
230 }
231 }
232
233 async fn join_cluster(&self) -> anyhow::Result<()> {
239 let node_id = self.config.node_id;
240 let my_addr = &self.config.advertise_addr;
241
242 info!(
243 "Node {} at {} waiting to receive Raft replication from leader",
244 node_id, my_addr
245 );
246
247 Ok(())
252 }
253
254 async fn run_discovery_loop(
256 discovery: Arc<NodeDiscovery>,
257 raft: Arc<DGateRaft>,
258 my_node_id: NodeId,
259 ) {
260 loop {
261 let nodes = discovery.discover().await;
262 for (node_id, node) in nodes {
263 let metrics = raft.metrics().borrow().clone();
264 if metrics.current_leader == Some(my_node_id) {
265 let add_learner =
267 ChangeMembers::AddNodes([(node_id, node.clone())].into_iter().collect());
268
269 match raft.change_membership(add_learner, false).await {
270 Ok(_) => {
271 info!("Added discovered node {} as learner", node_id);
272 let mut voter_ids = BTreeSet::new();
274 voter_ids.insert(node_id);
275 let promote = ChangeMembers::AddVoterIds(voter_ids);
276 match raft.change_membership(promote, false).await {
277 Ok(_) => info!("Promoted discovered node {} to voter", node_id),
278 Err(e) => debug!("Could not promote node {}: {}", node_id, e),
279 }
280 }
281 Err(e) => debug!("Could not add node {}: {}", node_id, e),
282 }
283 }
284 }
285
286 tokio::time::sleep(Duration::from_secs(10)).await;
287 }
288 }
289}
290
291#[async_trait]
292impl Consensus for RaftConsensus {
293 fn node_id(&self) -> NodeId {
294 self.config.node_id
295 }
296
297 fn mode(&self) -> ClusterMode {
298 self.config.mode
299 }
300
301 async fn initialize(&self) -> anyhow::Result<()> {
302 let node_id = self.config.node_id;
303
304 if self.config.bootstrap {
305 info!(
306 "Bootstrapping Raft cluster with node_id={} as initial leader",
307 node_id
308 );
309 self.bootstrap_cluster().await?;
310 } else if !self.config.initial_members.is_empty() {
311 info!(
312 "Joining existing Raft cluster with {} known members",
313 self.config.initial_members.len()
314 );
315 self.join_cluster().await?;
316 } else {
317 warn!("No bootstrap flag and no initial members - starting as isolated node");
318 self.bootstrap_single_node().await?;
319 }
320
321 if let Some(ref discovery) = self.discovery {
323 let discovery_clone = discovery.clone();
324 let raft_clone = self.raft.clone();
325 let my_node_id = self.config.node_id;
326 tokio::spawn(async move {
327 Self::run_discovery_loop(discovery_clone, raft_clone, my_node_id).await;
328 });
329 }
330
331 Ok(())
332 }
333
334 async fn can_write(&self) -> bool {
335 let metrics = self.raft.metrics().borrow().clone();
337 metrics.current_leader == Some(self.config.node_id)
338 }
339
340 async fn leader_id(&self) -> Option<NodeId> {
341 self.raft.metrics().borrow().current_leader
342 }
343
344 async fn propose(&self, changelog: ChangeLog) -> anyhow::Result<ConsensusResponse> {
345 let result: ClientWriteResponse<TypeConfig> = self
346 .raft
347 .client_write(changelog)
348 .await
349 .map_err(|e| anyhow::anyhow!("Raft write failed: {}", e))?;
350
351 Ok(result.data.into())
352 }
353
354 async fn metrics(&self) -> ConsensusMetrics {
355 let raft_metrics = self.raft.metrics().borrow().clone();
356 let members = self.cached_members.read().await.clone();
357
358 let state = match raft_metrics.state {
359 openraft::ServerState::Leader => NodeState::Leader,
360 openraft::ServerState::Follower => NodeState::Follower,
361 openraft::ServerState::Candidate => NodeState::Candidate,
362 openraft::ServerState::Learner => NodeState::Learner,
363 openraft::ServerState::Shutdown => NodeState::Shutdown,
364 };
365
366 ConsensusMetrics {
367 id: self.config.node_id,
368 mode: self.config.mode,
369 can_write: raft_metrics.current_leader == Some(self.config.node_id),
370 leader_id: raft_metrics.current_leader,
371 state,
372 current_term: Some(raft_metrics.vote.leader_id().term),
373 last_applied: raft_metrics.last_applied.map(|l| l.index),
374 committed: raft_metrics.last_applied.map(|l| l.index),
375 members,
376 extra: None,
377 }
378 }
379
380 async fn add_node(&self, node_id: NodeId, addr: String) -> anyhow::Result<()> {
381 info!("Adding node {} at {} to Raft cluster", node_id, addr);
382
383 let node = BasicNode { addr: addr.clone() };
385 let add_learner = ChangeMembers::AddNodes([(node_id, node)].into_iter().collect());
386
387 self.raft
388 .change_membership(add_learner, false)
389 .await
390 .map_err(|e| anyhow::anyhow!("Failed to add node as learner: {}", e))?;
391
392 info!("Node {} added as learner, now promoting to voter", node_id);
393
394 let mut voter_ids = BTreeSet::new();
396 voter_ids.insert(node_id);
397 let promote_voter = ChangeMembers::AddVoterIds(voter_ids);
398
399 self.raft
400 .change_membership(promote_voter, false)
401 .await
402 .map_err(|e| anyhow::anyhow!("Failed to promote node to voter: {}", e))?;
403
404 info!("Node {} successfully promoted to voter", node_id);
405
406 let mut cached = self.cached_members.write().await;
408 if !cached.iter().any(|m| m.id == node_id) {
409 cached.push(ClusterMember {
410 id: node_id,
411 addr,
412 admin_port: None,
413 tls: false,
414 });
415 }
416
417 Ok(())
418 }
419
420 async fn remove_node(&self, node_id: NodeId) -> anyhow::Result<()> {
421 info!("Removing node {} from Raft cluster", node_id);
422
423 let mut remove_set = BTreeSet::new();
424 remove_set.insert(node_id);
425
426 let change = ChangeMembers::RemoveNodes(remove_set);
427
428 self.raft
429 .change_membership(change, false)
430 .await
431 .map_err(|e| anyhow::anyhow!("Failed to remove node: {}", e))?;
432
433 let mut cached = self.cached_members.write().await;
435 cached.retain(|m| m.id != node_id);
436
437 Ok(())
438 }
439
440 async fn members(&self) -> Vec<ClusterMember> {
441 self.cached_members.read().await.clone()
442 }
443
444 async fn shutdown(&self) -> anyhow::Result<()> {
445 info!("Shutting down Raft consensus");
446 self.raft
447 .shutdown()
448 .await
449 .map_err(|e| anyhow::anyhow!("Shutdown failed: {}", e))?;
450 Ok(())
451 }
452}