rocketmq_controller/metadata/
broker.rs1use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use std::time::SystemTime;
22
23use dashmap::DashMap;
24use serde::Deserialize;
25use serde::Serialize;
26use tokio::time;
27use tracing::debug;
28use tracing::info;
29use tracing::warn;
30
31use crate::config::ControllerConfig;
32use crate::error::ControllerError;
33use crate::error::Result;
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct BrokerInfo {
38 pub name: String,
40
41 pub broker_id: u64,
43
44 pub cluster_name: String,
46
47 pub addr: SocketAddr,
49
50 pub last_heartbeat: SystemTime,
52
53 pub version: String,
55
56 pub role: BrokerRole,
58
59 pub metadata: serde_json::Value,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum BrokerRole {
66 Master,
67 Slave,
68}
69
70pub struct BrokerManager {
72 brokers: Arc<DashMap<String, BrokerInfo>>,
74
75 config: Arc<ControllerConfig>,
77
78 heartbeat_timeout: Duration,
80}
81
82impl BrokerManager {
83 pub fn new(config: Arc<ControllerConfig>) -> Self {
85 Self {
86 brokers: Arc::new(DashMap::new()),
87 config,
88 heartbeat_timeout: Duration::from_secs(30),
89 }
90 }
91
92 pub async fn start(&self) -> Result<()> {
94 info!("Starting broker manager");
95
96 let brokers = self.brokers.clone();
98 let timeout = self.heartbeat_timeout;
99 tokio::spawn(async move {
100 let mut interval = time::interval(Duration::from_secs(5));
101 loop {
102 interval.tick().await;
103 Self::check_heartbeats(&brokers, timeout);
104 }
105 });
106
107 Ok(())
108 }
109
110 pub async fn shutdown(&self) -> Result<()> {
112 info!("Shutting down broker manager");
113 self.brokers.clear();
114 Ok(())
115 }
116
117 pub async fn register(&self, info: BrokerInfo) -> Result<()> {
119 info!("Registering broker: {} ({})", info.name, info.addr);
120
121 if info.name.is_empty() {
123 return Err(ControllerError::InvalidRequest(
124 "Broker name cannot be empty".to_string(),
125 ));
126 }
127
128 self.brokers.insert(info.name.clone(), info);
130
131 Ok(())
132 }
133
134 pub async fn unregister(&self, broker_name: &str) -> Result<()> {
136 info!("Unregistering broker: {}", broker_name);
137
138 self.brokers
139 .remove(broker_name)
140 .ok_or_else(|| ControllerError::MetadataNotFound {
141 key: broker_name.to_string(),
142 })?;
143
144 Ok(())
145 }
146
147 pub async fn heartbeat(&self, broker_name: &str) -> Result<()> {
149 debug!("Heartbeat from broker: {}", broker_name);
150
151 let mut broker =
152 self.brokers
153 .get_mut(broker_name)
154 .ok_or_else(|| ControllerError::MetadataNotFound {
155 key: broker_name.to_string(),
156 })?;
157
158 broker.last_heartbeat = SystemTime::now();
159
160 Ok(())
161 }
162
163 pub async fn get_broker(&self, broker_name: &str) -> Result<BrokerInfo> {
165 self.brokers
166 .get(broker_name)
167 .map(|entry| entry.value().clone())
168 .ok_or_else(|| ControllerError::MetadataNotFound {
169 key: broker_name.to_string(),
170 })
171 }
172
173 pub async fn list_brokers(&self) -> Vec<BrokerInfo> {
175 self.brokers
176 .iter()
177 .map(|entry| entry.value().clone())
178 .collect()
179 }
180
181 pub async fn list_brokers_by_cluster(&self, cluster_name: &str) -> Vec<BrokerInfo> {
183 self.brokers
184 .iter()
185 .filter(|entry| entry.value().cluster_name == cluster_name)
186 .map(|entry| entry.value().clone())
187 .collect()
188 }
189
190 fn check_heartbeats(brokers: &DashMap<String, BrokerInfo>, timeout: Duration) {
192 let now = SystemTime::now();
193 let mut to_remove = Vec::new();
194
195 for entry in brokers.iter() {
196 let broker = entry.value();
197 if let Ok(elapsed) = now.duration_since(broker.last_heartbeat) {
198 if elapsed > timeout {
199 warn!(
200 "Broker {} heartbeat timeout, removing (last: {:?})",
201 broker.name, elapsed
202 );
203 to_remove.push(broker.name.clone());
204 }
205 }
206 }
207
208 for name in to_remove {
209 brokers.remove(&name);
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217
218 #[tokio::test]
219 async fn test_broker_registration() {
220 let config = Arc::new(ControllerConfig::test_config());
221
222 let manager = BrokerManager::new(config);
223
224 let info = BrokerInfo {
225 name: "broker-a".to_string(),
226 broker_id: 0,
227 cluster_name: "DefaultCluster".to_string(),
228 addr: "127.0.0.1:10911".parse().unwrap(),
229 last_heartbeat: SystemTime::now(),
230 version: "5.0.0".to_string(),
231 role: BrokerRole::Master,
232 metadata: serde_json::json!({}),
233 };
234
235 assert!(manager.register(info.clone()).await.is_ok());
236 assert!(manager.get_broker("broker-a").await.is_ok());
237 }
238}