rocketmq_controller/metadata/
broker.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use std::time::SystemTime;
22
23use dashmap::DashMap;
24use serde::Deserialize;
25use serde::Serialize;
26use tokio::time;
27use tracing::debug;
28use tracing::info;
29use tracing::warn;
30
31use crate::config::ControllerConfig;
32use crate::error::ControllerError;
33use crate::error::Result;
34
35/// Broker information
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct BrokerInfo {
38    /// Broker name
39    pub name: String,
40
41    /// Broker ID
42    pub broker_id: u64,
43
44    /// Cluster name
45    pub cluster_name: String,
46
47    /// Broker address
48    pub addr: SocketAddr,
49
50    /// Last heartbeat time
51    pub last_heartbeat: SystemTime,
52
53    /// Broker version
54    pub version: String,
55
56    /// Broker role (MASTER, SLAVE)
57    pub role: BrokerRole,
58
59    /// Additional metadata
60    pub metadata: serde_json::Value,
61}
62
63/// Broker role
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum BrokerRole {
66    Master,
67    Slave,
68}
69
70/// Broker manager
71pub struct BrokerManager {
72    /// Registered brokers: broker_name -> BrokerInfo
73    brokers: Arc<DashMap<String, BrokerInfo>>,
74
75    /// Configuration
76    config: Arc<ControllerConfig>,
77
78    /// Heartbeat timeout duration
79    heartbeat_timeout: Duration,
80}
81
82impl BrokerManager {
83    /// Create a new broker manager
84    pub fn new(config: Arc<ControllerConfig>) -> Self {
85        Self {
86            brokers: Arc::new(DashMap::new()),
87            config,
88            heartbeat_timeout: Duration::from_secs(30),
89        }
90    }
91
92    /// Start the broker manager
93    pub async fn start(&self) -> Result<()> {
94        info!("Starting broker manager");
95
96        // Start heartbeat checker
97        let brokers = self.brokers.clone();
98        let timeout = self.heartbeat_timeout;
99        tokio::spawn(async move {
100            let mut interval = time::interval(Duration::from_secs(5));
101            loop {
102                interval.tick().await;
103                Self::check_heartbeats(&brokers, timeout);
104            }
105        });
106
107        Ok(())
108    }
109
110    /// Shutdown the broker manager
111    pub async fn shutdown(&self) -> Result<()> {
112        info!("Shutting down broker manager");
113        self.brokers.clear();
114        Ok(())
115    }
116
117    /// Register a broker
118    pub async fn register(&self, info: BrokerInfo) -> Result<()> {
119        info!("Registering broker: {} ({})", info.name, info.addr);
120
121        // Validate broker info
122        if info.name.is_empty() {
123            return Err(ControllerError::InvalidRequest(
124                "Broker name cannot be empty".to_string(),
125            ));
126        }
127
128        // Update broker info
129        self.brokers.insert(info.name.clone(), info);
130
131        Ok(())
132    }
133
134    /// Unregister a broker
135    pub async fn unregister(&self, broker_name: &str) -> Result<()> {
136        info!("Unregistering broker: {}", broker_name);
137
138        self.brokers
139            .remove(broker_name)
140            .ok_or_else(|| ControllerError::MetadataNotFound {
141                key: broker_name.to_string(),
142            })?;
143
144        Ok(())
145    }
146
147    /// Update broker heartbeat
148    pub async fn heartbeat(&self, broker_name: &str) -> Result<()> {
149        debug!("Heartbeat from broker: {}", broker_name);
150
151        let mut broker =
152            self.brokers
153                .get_mut(broker_name)
154                .ok_or_else(|| ControllerError::MetadataNotFound {
155                    key: broker_name.to_string(),
156                })?;
157
158        broker.last_heartbeat = SystemTime::now();
159
160        Ok(())
161    }
162
163    /// Get broker information
164    pub async fn get_broker(&self, broker_name: &str) -> Result<BrokerInfo> {
165        self.brokers
166            .get(broker_name)
167            .map(|entry| entry.value().clone())
168            .ok_or_else(|| ControllerError::MetadataNotFound {
169                key: broker_name.to_string(),
170            })
171    }
172
173    /// List all brokers
174    pub async fn list_brokers(&self) -> Vec<BrokerInfo> {
175        self.brokers
176            .iter()
177            .map(|entry| entry.value().clone())
178            .collect()
179    }
180
181    /// List brokers by cluster
182    pub async fn list_brokers_by_cluster(&self, cluster_name: &str) -> Vec<BrokerInfo> {
183        self.brokers
184            .iter()
185            .filter(|entry| entry.value().cluster_name == cluster_name)
186            .map(|entry| entry.value().clone())
187            .collect()
188    }
189
190    /// Check heartbeats and remove stale brokers
191    fn check_heartbeats(brokers: &DashMap<String, BrokerInfo>, timeout: Duration) {
192        let now = SystemTime::now();
193        let mut to_remove = Vec::new();
194
195        for entry in brokers.iter() {
196            let broker = entry.value();
197            if let Ok(elapsed) = now.duration_since(broker.last_heartbeat) {
198                if elapsed > timeout {
199                    warn!(
200                        "Broker {} heartbeat timeout, removing (last: {:?})",
201                        broker.name, elapsed
202                    );
203                    to_remove.push(broker.name.clone());
204                }
205            }
206        }
207
208        for name in to_remove {
209            brokers.remove(&name);
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    #[tokio::test]
219    async fn test_broker_registration() {
220        let config = Arc::new(ControllerConfig::test_config());
221
222        let manager = BrokerManager::new(config);
223
224        let info = BrokerInfo {
225            name: "broker-a".to_string(),
226            broker_id: 0,
227            cluster_name: "DefaultCluster".to_string(),
228            addr: "127.0.0.1:10911".parse().unwrap(),
229            last_heartbeat: SystemTime::now(),
230            version: "5.0.0".to_string(),
231            role: BrokerRole::Master,
232            metadata: serde_json::json!({}),
233        };
234
235        assert!(manager.register(info.clone()).await.is_ok());
236        assert!(manager.get_broker("broker-a").await.is_ok());
237    }
238}