rocketmq_controller/processor/
broker_processor.rs1use std::sync::Arc;
19use std::time::SystemTime;
20
21use tracing::debug;
22use tracing::error;
23use tracing::info;
24
25use crate::error::ControllerError;
26use crate::error::Result;
27use crate::metadata::BrokerInfo;
28use crate::metadata::MetadataStore;
29use crate::processor::request::BrokerHeartbeatRequest;
30use crate::processor::request::BrokerHeartbeatResponse;
31use crate::processor::request::ElectMasterRequest;
32use crate::processor::request::ElectMasterResponse;
33use crate::processor::request::RegisterBrokerRequest;
34use crate::processor::request::RegisterBrokerResponse;
35use crate::processor::request::UnregisterBrokerRequest;
36use crate::processor::request::UnregisterBrokerResponse;
37use crate::processor::RequestProcessor;
38use crate::raft::RaftController;
39
40pub struct RegisterBrokerProcessor {
42 metadata: Arc<MetadataStore>,
44
45 raft: Arc<RaftController>,
47}
48
49impl RegisterBrokerProcessor {
50 pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
52 Self { metadata, raft }
53 }
54
55 pub async fn process_request(
57 &self,
58 request: RegisterBrokerRequest,
59 ) -> Result<RegisterBrokerResponse> {
60 info!(
61 "Processing register broker request: {}",
62 request.broker_name
63 );
64
65 if !self.raft.is_leader().await {
67 let leader = self.raft.get_leader().await;
68 error!("Not leader, current leader: {:?}", leader);
69 return Ok(RegisterBrokerResponse {
70 success: false,
71 error: Some(format!("Not leader, current leader: {:?}", leader)),
72 broker_id: None,
73 });
74 }
75
76 let broker_info = BrokerInfo {
78 name: request.broker_name.clone(),
79 broker_id: request.broker_id,
80 cluster_name: request.cluster_name.clone(),
81 addr: request.broker_addr,
82 last_heartbeat: SystemTime::now(),
83 version: request.version.clone(),
84 role: request.role,
85 metadata: request.metadata.clone(),
86 };
87
88 match self.metadata.broker_manager().register(broker_info).await {
90 Ok(()) => {
91 info!("Successfully registered broker: {}", request.broker_name);
92 Ok(RegisterBrokerResponse {
93 success: true,
94 error: None,
95 broker_id: Some(request.broker_id),
96 })
97 }
98 Err(e) => {
99 error!("Failed to register broker {}: {}", request.broker_name, e);
100 Ok(RegisterBrokerResponse {
101 success: false,
102 error: Some(e.to_string()),
103 broker_id: None,
104 })
105 }
106 }
107 }
108}
109
110#[async_trait::async_trait]
111impl RequestProcessor for RegisterBrokerProcessor {
112 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
113 let req: RegisterBrokerRequest = serde_json::from_slice(request)
115 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
116
117 let response = self.process_request(req).await?;
119
120 serde_json::to_vec(&response)
122 .map_err(|e| ControllerError::SerializationError(e.to_string()))
123 }
124}
125
126pub struct UnregisterBrokerProcessor {
128 metadata: Arc<MetadataStore>,
130
131 raft: Arc<RaftController>,
133}
134
135impl UnregisterBrokerProcessor {
136 pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
138 Self { metadata, raft }
139 }
140
141 pub async fn process_request(
143 &self,
144 request: UnregisterBrokerRequest,
145 ) -> Result<UnregisterBrokerResponse> {
146 info!(
147 "Processing unregister broker request: {}",
148 request.broker_name
149 );
150
151 if !self.raft.is_leader().await {
153 let leader = self.raft.get_leader().await;
154 return Ok(UnregisterBrokerResponse {
155 success: false,
156 error: Some(format!("Not leader, current leader: {:?}", leader)),
157 });
158 }
159
160 match self
162 .metadata
163 .broker_manager()
164 .unregister(&request.broker_name)
165 .await
166 {
167 Ok(()) => {
168 info!("Successfully unregistered broker: {}", request.broker_name);
169 Ok(UnregisterBrokerResponse {
170 success: true,
171 error: None,
172 })
173 }
174 Err(e) => {
175 error!("Failed to unregister broker {}: {}", request.broker_name, e);
176 Ok(UnregisterBrokerResponse {
177 success: false,
178 error: Some(e.to_string()),
179 })
180 }
181 }
182 }
183}
184
185#[async_trait::async_trait]
186impl RequestProcessor for UnregisterBrokerProcessor {
187 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
188 let req: UnregisterBrokerRequest = serde_json::from_slice(request)
189 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
190
191 let response = self.process_request(req).await?;
192
193 serde_json::to_vec(&response)
194 .map_err(|e| ControllerError::SerializationError(e.to_string()))
195 }
196}
197
198pub struct BrokerHeartbeatProcessor {
200 metadata: Arc<MetadataStore>,
202}
203
204impl BrokerHeartbeatProcessor {
205 pub fn new(metadata: Arc<MetadataStore>) -> Self {
207 Self { metadata }
208 }
209
210 pub async fn process_request(
212 &self,
213 request: BrokerHeartbeatRequest,
214 ) -> Result<BrokerHeartbeatResponse> {
215 debug!("Processing broker heartbeat: {}", request.broker_name);
216
217 match self
219 .metadata
220 .broker_manager()
221 .heartbeat(&request.broker_name)
222 .await
223 {
224 Ok(()) => {
225 debug!(
226 "Successfully updated heartbeat for broker: {}",
227 request.broker_name
228 );
229 Ok(BrokerHeartbeatResponse {
230 success: true,
231 error: None,
232 })
233 }
234 Err(e) => {
235 error!(
236 "Failed to update heartbeat for broker {}: {}",
237 request.broker_name, e
238 );
239 Ok(BrokerHeartbeatResponse {
240 success: false,
241 error: Some(e.to_string()),
242 })
243 }
244 }
245 }
246}
247
248#[async_trait::async_trait]
249impl RequestProcessor for BrokerHeartbeatProcessor {
250 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
251 let req: BrokerHeartbeatRequest = serde_json::from_slice(request)
252 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
253
254 let response = self.process_request(req).await?;
255
256 serde_json::to_vec(&response)
257 .map_err(|e| ControllerError::SerializationError(e.to_string()))
258 }
259}
260
261pub struct ElectMasterProcessor {
263 metadata: Arc<MetadataStore>,
265
266 raft: Arc<RaftController>,
268}
269
270impl ElectMasterProcessor {
271 pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
273 Self { metadata, raft }
274 }
275
276 pub async fn process_request(
278 &self,
279 request: ElectMasterRequest,
280 ) -> Result<ElectMasterResponse> {
281 info!(
282 "Processing elect master request for cluster: {}, broker: {}",
283 request.cluster_name, request.broker_name
284 );
285
286 if !self.raft.is_leader().await {
288 let leader = self.raft.get_leader().await;
289 return Ok(ElectMasterResponse {
290 success: false,
291 error: Some(format!("Not leader, current leader: {:?}", leader)),
292 master_broker: None,
293 master_addr: None,
294 });
295 }
296
297 let brokers = self
299 .metadata
300 .broker_manager()
301 .list_brokers_by_cluster(&request.cluster_name)
302 .await;
303
304 if brokers.is_empty() {
305 return Ok(ElectMasterResponse {
306 success: false,
307 error: Some("No brokers found in cluster".to_string()),
308 master_broker: None,
309 master_addr: None,
310 });
311 }
312
313 let master = brokers
315 .iter()
316 .find(|b| b.role == crate::metadata::BrokerRole::Master);
317
318 match master {
319 Some(broker) => {
320 info!("Elected master broker: {}", broker.name);
321 Ok(ElectMasterResponse {
322 success: true,
323 error: None,
324 master_broker: Some(broker.name.clone()),
325 master_addr: Some(broker.addr),
326 })
327 }
328 None => Ok(ElectMasterResponse {
329 success: false,
330 error: Some("No master broker found in cluster".to_string()),
331 master_broker: None,
332 master_addr: None,
333 }),
334 }
335 }
336}
337
338#[async_trait::async_trait]
339impl RequestProcessor for ElectMasterProcessor {
340 async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
341 let req: ElectMasterRequest = serde_json::from_slice(request)
342 .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
343
344 let response = self.process_request(req).await?;
345
346 serde_json::to_vec(&response)
347 .map_err(|e| ControllerError::SerializationError(e.to_string()))
348 }
349}