rocketmq_controller/processor/
broker_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::sync::Arc;
19use std::time::SystemTime;
20
21use tracing::debug;
22use tracing::error;
23use tracing::info;
24
25use crate::error::ControllerError;
26use crate::error::Result;
27use crate::metadata::BrokerInfo;
28use crate::metadata::MetadataStore;
29use crate::processor::request::BrokerHeartbeatRequest;
30use crate::processor::request::BrokerHeartbeatResponse;
31use crate::processor::request::ElectMasterRequest;
32use crate::processor::request::ElectMasterResponse;
33use crate::processor::request::RegisterBrokerRequest;
34use crate::processor::request::RegisterBrokerResponse;
35use crate::processor::request::UnregisterBrokerRequest;
36use crate::processor::request::UnregisterBrokerResponse;
37use crate::processor::RequestProcessor;
38use crate::raft::RaftController;
39
40/// Register broker processor
41pub struct RegisterBrokerProcessor {
42    /// Metadata store
43    metadata: Arc<MetadataStore>,
44
45    /// Raft controller
46    raft: Arc<RaftController>,
47}
48
49impl RegisterBrokerProcessor {
50    /// Create a new register broker processor
51    pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
52        Self { metadata, raft }
53    }
54
55    /// Process register broker request
56    pub async fn process_request(
57        &self,
58        request: RegisterBrokerRequest,
59    ) -> Result<RegisterBrokerResponse> {
60        info!(
61            "Processing register broker request: {}",
62            request.broker_name
63        );
64
65        // Check if we are the leader
66        if !self.raft.is_leader().await {
67            let leader = self.raft.get_leader().await;
68            error!("Not leader, current leader: {:?}", leader);
69            return Ok(RegisterBrokerResponse {
70                success: false,
71                error: Some(format!("Not leader, current leader: {:?}", leader)),
72                broker_id: None,
73            });
74        }
75
76        // Create broker info
77        let broker_info = BrokerInfo {
78            name: request.broker_name.clone(),
79            broker_id: request.broker_id,
80            cluster_name: request.cluster_name.clone(),
81            addr: request.broker_addr,
82            last_heartbeat: SystemTime::now(),
83            version: request.version.clone(),
84            role: request.role,
85            metadata: request.metadata.clone(),
86        };
87
88        // Register broker
89        match self.metadata.broker_manager().register(broker_info).await {
90            Ok(()) => {
91                info!("Successfully registered broker: {}", request.broker_name);
92                Ok(RegisterBrokerResponse {
93                    success: true,
94                    error: None,
95                    broker_id: Some(request.broker_id),
96                })
97            }
98            Err(e) => {
99                error!("Failed to register broker {}: {}", request.broker_name, e);
100                Ok(RegisterBrokerResponse {
101                    success: false,
102                    error: Some(e.to_string()),
103                    broker_id: None,
104                })
105            }
106        }
107    }
108}
109
110#[async_trait::async_trait]
111impl RequestProcessor for RegisterBrokerProcessor {
112    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
113        // Deserialize request
114        let req: RegisterBrokerRequest = serde_json::from_slice(request)
115            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
116
117        // Process request
118        let response = self.process_request(req).await?;
119
120        // Serialize response
121        serde_json::to_vec(&response)
122            .map_err(|e| ControllerError::SerializationError(e.to_string()))
123    }
124}
125
126/// Unregister broker processor
127pub struct UnregisterBrokerProcessor {
128    /// Metadata store
129    metadata: Arc<MetadataStore>,
130
131    /// Raft controller
132    raft: Arc<RaftController>,
133}
134
135impl UnregisterBrokerProcessor {
136    /// Create a new unregister broker processor
137    pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
138        Self { metadata, raft }
139    }
140
141    /// Process unregister broker request
142    pub async fn process_request(
143        &self,
144        request: UnregisterBrokerRequest,
145    ) -> Result<UnregisterBrokerResponse> {
146        info!(
147            "Processing unregister broker request: {}",
148            request.broker_name
149        );
150
151        // Check if we are the leader
152        if !self.raft.is_leader().await {
153            let leader = self.raft.get_leader().await;
154            return Ok(UnregisterBrokerResponse {
155                success: false,
156                error: Some(format!("Not leader, current leader: {:?}", leader)),
157            });
158        }
159
160        // Unregister broker
161        match self
162            .metadata
163            .broker_manager()
164            .unregister(&request.broker_name)
165            .await
166        {
167            Ok(()) => {
168                info!("Successfully unregistered broker: {}", request.broker_name);
169                Ok(UnregisterBrokerResponse {
170                    success: true,
171                    error: None,
172                })
173            }
174            Err(e) => {
175                error!("Failed to unregister broker {}: {}", request.broker_name, e);
176                Ok(UnregisterBrokerResponse {
177                    success: false,
178                    error: Some(e.to_string()),
179                })
180            }
181        }
182    }
183}
184
185#[async_trait::async_trait]
186impl RequestProcessor for UnregisterBrokerProcessor {
187    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
188        let req: UnregisterBrokerRequest = serde_json::from_slice(request)
189            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
190
191        let response = self.process_request(req).await?;
192
193        serde_json::to_vec(&response)
194            .map_err(|e| ControllerError::SerializationError(e.to_string()))
195    }
196}
197
198/// Broker heartbeat processor
199pub struct BrokerHeartbeatProcessor {
200    /// Metadata store
201    metadata: Arc<MetadataStore>,
202}
203
204impl BrokerHeartbeatProcessor {
205    /// Create a new broker heartbeat processor
206    pub fn new(metadata: Arc<MetadataStore>) -> Self {
207        Self { metadata }
208    }
209
210    /// Process broker heartbeat request
211    pub async fn process_request(
212        &self,
213        request: BrokerHeartbeatRequest,
214    ) -> Result<BrokerHeartbeatResponse> {
215        debug!("Processing broker heartbeat: {}", request.broker_name);
216
217        // Update heartbeat
218        match self
219            .metadata
220            .broker_manager()
221            .heartbeat(&request.broker_name)
222            .await
223        {
224            Ok(()) => {
225                debug!(
226                    "Successfully updated heartbeat for broker: {}",
227                    request.broker_name
228                );
229                Ok(BrokerHeartbeatResponse {
230                    success: true,
231                    error: None,
232                })
233            }
234            Err(e) => {
235                error!(
236                    "Failed to update heartbeat for broker {}: {}",
237                    request.broker_name, e
238                );
239                Ok(BrokerHeartbeatResponse {
240                    success: false,
241                    error: Some(e.to_string()),
242                })
243            }
244        }
245    }
246}
247
248#[async_trait::async_trait]
249impl RequestProcessor for BrokerHeartbeatProcessor {
250    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
251        let req: BrokerHeartbeatRequest = serde_json::from_slice(request)
252            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
253
254        let response = self.process_request(req).await?;
255
256        serde_json::to_vec(&response)
257            .map_err(|e| ControllerError::SerializationError(e.to_string()))
258    }
259}
260
261/// Elect master processor
262pub struct ElectMasterProcessor {
263    /// Metadata store
264    metadata: Arc<MetadataStore>,
265
266    /// Raft controller
267    raft: Arc<RaftController>,
268}
269
270impl ElectMasterProcessor {
271    /// Create a new elect master processor
272    pub fn new(metadata: Arc<MetadataStore>, raft: Arc<RaftController>) -> Self {
273        Self { metadata, raft }
274    }
275
276    /// Process elect master request
277    pub async fn process_request(
278        &self,
279        request: ElectMasterRequest,
280    ) -> Result<ElectMasterResponse> {
281        info!(
282            "Processing elect master request for cluster: {}, broker: {}",
283            request.cluster_name, request.broker_name
284        );
285
286        // Check if we are the leader
287        if !self.raft.is_leader().await {
288            let leader = self.raft.get_leader().await;
289            return Ok(ElectMasterResponse {
290                success: false,
291                error: Some(format!("Not leader, current leader: {:?}", leader)),
292                master_broker: None,
293                master_addr: None,
294            });
295        }
296
297        // Get brokers in the cluster
298        let brokers = self
299            .metadata
300            .broker_manager()
301            .list_brokers_by_cluster(&request.cluster_name)
302            .await;
303
304        if brokers.is_empty() {
305            return Ok(ElectMasterResponse {
306                success: false,
307                error: Some("No brokers found in cluster".to_string()),
308                master_broker: None,
309                master_addr: None,
310            });
311        }
312
313        // Find the master broker (simple logic: first broker with Master role)
314        let master = brokers
315            .iter()
316            .find(|b| b.role == crate::metadata::BrokerRole::Master);
317
318        match master {
319            Some(broker) => {
320                info!("Elected master broker: {}", broker.name);
321                Ok(ElectMasterResponse {
322                    success: true,
323                    error: None,
324                    master_broker: Some(broker.name.clone()),
325                    master_addr: Some(broker.addr),
326                })
327            }
328            None => Ok(ElectMasterResponse {
329                success: false,
330                error: Some("No master broker found in cluster".to_string()),
331                master_broker: None,
332                master_addr: None,
333            }),
334        }
335    }
336}
337
338#[async_trait::async_trait]
339impl RequestProcessor for ElectMasterProcessor {
340    async fn process(&self, request: &[u8]) -> Result<Vec<u8>> {
341        let req: ElectMasterRequest = serde_json::from_slice(request)
342            .map_err(|e| ControllerError::InvalidRequest(e.to_string()))?;
343
344        let response = self.process_request(req).await?;
345
346        serde_json::to_vec(&response)
347            .map_err(|e| ControllerError::SerializationError(e.to_string()))
348    }
349}