rocketmq_controller/raft/
node.rs1use std::collections::HashMap;
19use std::sync::Arc;
20
21use raft::prelude::*;
22use raft::storage::MemStorage as RaftMemStorage;
23use raft::StateRole;
24use tokio::sync::RwLock;
25use tracing::debug;
26use tracing::info;
27
28use crate::config::ControllerConfig;
29use crate::error::ControllerError;
30use crate::error::Result;
31
32type ProposalMap = HashMap<Vec<u8>, tokio::sync::oneshot::Sender<Result<Vec<u8>>>>;
34
35pub struct RaftNode {
37 id: u64,
39
40 raw_node: Arc<RwLock<RawNode<RaftMemStorage>>>,
42
43 proposals: Arc<RwLock<ProposalMap>>,
45}
46
47impl RaftNode {
48 pub async fn new(id: u64, config: Arc<ControllerConfig>) -> Result<Self> {
50 let raft_config = Config {
52 id,
53 election_tick: (config.election_timeout_ms / 100) as usize,
54 heartbeat_tick: (config.heartbeat_interval_ms / 100) as usize,
55 max_size_per_msg: 1024 * 1024,
56 max_inflight_msgs: 256,
57 ..Default::default()
58 };
59 raft_config
60 .validate()
61 .map_err(|e| ControllerError::ConfigError(format!("Invalid Raft config: {:?}", e)))?;
62
63 let storage = RaftMemStorage::new();
65
66 let peers: Vec<u64> = config.raft_peers.iter().map(|p| p.id).collect();
68 if !peers.is_empty() {
69 let mut snapshot = Snapshot::default();
70 snapshot.mut_metadata().index = 0;
71 snapshot.mut_metadata().term = 0;
72 snapshot
73 .mut_metadata()
74 .mut_conf_state()
75 .voters
76 .clone_from(&peers);
77
78 storage
79 .wl()
80 .apply_snapshot(snapshot)
81 .map_err(|e| ControllerError::Raft(format!("Failed to apply snapshot: {:?}", e)))?;
82 }
83
84 let raw_node = RawNode::new(&raft_config, storage, &slog_global::get_global())
86 .map_err(|e| ControllerError::Raft(format!("Failed to create RawNode: {:?}", e)))?;
87
88 info!("Created Raft node {} with peers: {:?}", id, peers);
89
90 Ok(Self {
91 id,
92 raw_node: Arc::new(RwLock::new(raw_node)),
93 proposals: Arc::new(RwLock::new(HashMap::new())),
94 })
95 }
96
97 pub async fn propose(&self, data: Vec<u8>) -> Result<Vec<u8>> {
99 let mut raw_node = self.raw_node.write().await;
100
101 if raw_node.raft.state != StateRole::Leader {
103 let leader = raw_node.raft.leader_id;
104 return Err(ControllerError::NotLeader {
105 leader_id: if leader == 0 { None } else { Some(leader) },
106 });
107 }
108
109 raw_node
111 .propose(vec![], data.clone())
112 .map_err(|e| ControllerError::Raft(format!("Failed to propose: {:?}", e)))?;
113
114 Ok(data)
117 }
118
119 pub async fn query(&self, _data: Vec<u8>) -> Result<Vec<u8>> {
121 let raw_node = self.raw_node.read().await;
122
123 if raw_node.raft.state != StateRole::Leader {
125 let leader = raw_node.raft.leader_id;
126 return Err(ControllerError::NotLeader {
127 leader_id: if leader == 0 { None } else { Some(leader) },
128 });
129 }
130
131 Ok(vec![])
134 }
135
136 pub async fn step(&self, message: Message) -> Result<()> {
138 let mut raw_node = self.raw_node.write().await;
139 raw_node
140 .step(message)
141 .map_err(|e| ControllerError::Raft(format!("Failed to step: {:?}", e)))?;
142 Ok(())
143 }
144
145 pub async fn tick(&self) -> Result<()> {
147 let mut raw_node = self.raw_node.write().await;
148 raw_node.tick();
149
150 if raw_node.has_ready() {
152 let mut ready = raw_node.ready();
153
154 if !ready.messages().is_empty() {
156 debug!("Need to send {} messages to peers", ready.messages().len());
158 }
159
160 for entry in ready.take_committed_entries() {
162 if entry.data.is_empty() {
163 continue;
165 }
166 debug!("Applying entry: {:?}", entry);
168 }
169
170 let light_rd = raw_node.advance(ready);
172 if let Some(commit) = light_rd.commit_index() {
173 raw_node.mut_store().wl().commit_to(commit).ok();
174 }
175 raw_node.advance_apply();
176 }
177
178 Ok(())
179 }
180
181 pub async fn is_leader(&self) -> bool {
183 let raw_node = self.raw_node.read().await;
184 raw_node.raft.state == StateRole::Leader
185 }
186
187 pub async fn get_leader(&self) -> Option<u64> {
189 let raw_node = self.raw_node.read().await;
190 let leader = raw_node.raft.leader_id;
191 if leader == 0 {
192 None
193 } else {
194 Some(leader)
195 }
196 }
197}
198
199mod slog_global {
201 use slog::o;
202 use slog::Discard;
203 use slog::Logger;
204
205 pub fn get_global() -> Logger {
206 Logger::root(Discard, o!())
207 }
208}