rocketmq_controller/metadata/
replica.rs1use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use dashmap::DashMap;
23use serde::Deserialize;
24use serde::Serialize;
25use tracing::debug;
26use tracing::info;
27use tracing::warn;
28
29use crate::config::ControllerConfig;
30use crate::error::Result;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum ReplicaRole {
35 Master,
37 Slave,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct BrokerReplicaInfo {
50 pub cluster_name: String,
52
53 pub broker_name: String,
55
56 pub broker_id: u64,
58
59 pub broker_addr: String,
61
62 pub role: ReplicaRole,
64
65 pub epoch: u64,
67
68 pub max_offset: i64,
70
71 pub last_sync_timestamp: u64,
73
74 pub in_sync: bool,
76}
77
78impl BrokerReplicaInfo {
79 pub fn new_master(
81 cluster_name: String,
82 broker_name: String,
83 broker_id: u64,
84 broker_addr: String,
85 epoch: u64,
86 ) -> Self {
87 Self {
88 cluster_name,
89 broker_name,
90 broker_id,
91 broker_addr,
92 role: ReplicaRole::Master,
93 epoch,
94 max_offset: 0,
95 last_sync_timestamp: SystemTime::now()
96 .duration_since(SystemTime::UNIX_EPOCH)
97 .unwrap()
98 .as_secs(),
99 in_sync: true,
100 }
101 }
102
103 pub fn new_slave(
105 cluster_name: String,
106 broker_name: String,
107 broker_id: u64,
108 broker_addr: String,
109 ) -> Self {
110 Self {
111 cluster_name,
112 broker_name,
113 broker_id,
114 broker_addr,
115 role: ReplicaRole::Slave,
116 epoch: 0,
117 max_offset: 0,
118 last_sync_timestamp: SystemTime::now()
119 .duration_since(SystemTime::UNIX_EPOCH)
120 .unwrap()
121 .as_secs(),
122 in_sync: false,
123 }
124 }
125
126 pub fn is_master(&self) -> bool {
128 self.role == ReplicaRole::Master
129 }
130
131 pub fn is_in_sync(&self) -> bool {
133 self.in_sync
134 }
135
136 pub fn replica_id(&self) -> String {
138 format!("{}:{}", self.broker_name, self.broker_id)
139 }
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SyncStateSet {
148 pub broker_name: String,
150
151 pub master_broker_id: u64,
153
154 pub master_addr: String,
156
157 pub master_epoch: u64,
159
160 pub sync_state_set: Vec<u64>,
162
163 pub last_update_timestamp: u64,
165}
166
167impl SyncStateSet {
168 pub fn new(broker_name: String, master_broker_id: u64, master_addr: String) -> Self {
170 Self {
171 broker_name,
172 master_broker_id,
173 master_addr,
174 master_epoch: 0,
175 sync_state_set: vec![master_broker_id],
176 last_update_timestamp: SystemTime::now()
177 .duration_since(SystemTime::UNIX_EPOCH)
178 .unwrap()
179 .as_secs(),
180 }
181 }
182
183 pub fn contains(&self, broker_id: u64) -> bool {
185 self.sync_state_set.contains(&broker_id)
186 }
187
188 pub fn add_broker(&mut self, broker_id: u64) {
190 if !self.sync_state_set.contains(&broker_id) {
191 self.sync_state_set.push(broker_id);
192 self.last_update_timestamp = SystemTime::now()
193 .duration_since(SystemTime::UNIX_EPOCH)
194 .unwrap()
195 .as_secs();
196 }
197 }
198
199 pub fn remove_broker(&mut self, broker_id: u64) {
201 self.sync_state_set.retain(|&id| id != broker_id);
202 self.last_update_timestamp = SystemTime::now()
203 .duration_since(SystemTime::UNIX_EPOCH)
204 .unwrap()
205 .as_secs();
206 }
207
208 pub fn size(&self) -> usize {
210 self.sync_state_set.len()
211 }
212
213 pub fn is_master(&self, broker_id: u64) -> bool {
215 self.master_broker_id == broker_id
216 }
217}
218
219pub struct ReplicasManager {
227 config: Arc<ControllerConfig>,
229
230 replicas: Arc<DashMap<String, HashMap<u64, BrokerReplicaInfo>>>,
232
233 sync_state_sets: Arc<DashMap<String, SyncStateSet>>,
235}
236
237impl ReplicasManager {
238 pub fn new(config: Arc<ControllerConfig>) -> Self {
240 Self {
241 config,
242 replicas: Arc::new(DashMap::new()),
243 sync_state_sets: Arc::new(DashMap::new()),
244 }
245 }
246
247 pub async fn start(&self) -> Result<()> {
249 info!("Starting replicas manager");
250 Ok(())
251 }
252
253 pub async fn shutdown(&self) -> Result<()> {
255 info!("Shutting down replicas manager");
256 self.replicas.clear();
257 self.sync_state_sets.clear();
258 Ok(())
259 }
260
261 pub async fn register_replica(&self, replica: BrokerReplicaInfo) -> Result<()> {
263 let broker_name = replica.broker_name.clone();
264 let broker_id = replica.broker_id;
265 let is_master = replica.is_master();
266
267 debug!(
268 "Registering replica: {}:{} (role={:?})",
269 broker_name, broker_id, replica.role
270 );
271
272 self.replicas
274 .entry(broker_name.clone())
275 .or_default()
276 .insert(broker_id, replica.clone());
277
278 if is_master {
280 let mut sync_state_set =
281 SyncStateSet::new(broker_name.clone(), broker_id, replica.broker_addr.clone());
282 sync_state_set.master_epoch = replica.epoch;
283 self.sync_state_sets.insert(broker_name, sync_state_set);
284 }
285
286 Ok(())
287 }
288
289 pub async fn unregister_replica(&self, broker_name: &str, broker_id: u64) -> Result<()> {
291 debug!("Unregistering replica: {}:{}", broker_name, broker_id);
292
293 if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
295 replicas.remove(&broker_id);
296 if replicas.is_empty() {
297 drop(replicas);
298 self.replicas.remove(broker_name);
299 }
300 }
301
302 if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
304 sync_state_set.remove_broker(broker_id);
305 }
306
307 Ok(())
308 }
309
310 pub async fn get_master(&self, broker_name: &str) -> Option<BrokerReplicaInfo> {
312 self.replicas.get(broker_name).and_then(|replicas| {
313 replicas
314 .values()
315 .find(|replica| replica.is_master())
316 .cloned()
317 })
318 }
319
320 pub async fn get_replicas(&self, broker_name: &str) -> Vec<BrokerReplicaInfo> {
322 self.replicas
323 .get(broker_name)
324 .map(|replicas| replicas.values().cloned().collect())
325 .unwrap_or_default()
326 }
327
328 pub async fn get_replica(
330 &self,
331 broker_name: &str,
332 broker_id: u64,
333 ) -> Option<BrokerReplicaInfo> {
334 self.replicas
335 .get(broker_name)
336 .and_then(|replicas| replicas.get(&broker_id).cloned())
337 }
338
339 pub async fn get_sync_state_set(&self, broker_name: &str) -> Option<SyncStateSet> {
341 self.sync_state_sets.get(broker_name).map(|s| s.clone())
342 }
343
344 pub async fn add_to_sync_state_set(&self, broker_name: &str, broker_id: u64) -> Result<()> {
346 if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
347 sync_state_set.add_broker(broker_id);
348 debug!(
349 "Added broker {} to sync state set for {}",
350 broker_id, broker_name
351 );
352 }
353
354 if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
356 if let Some(replica) = replicas.get_mut(&broker_id) {
357 replica.in_sync = true;
358 }
359 }
360
361 Ok(())
362 }
363
364 pub async fn remove_from_sync_state_set(
366 &self,
367 broker_name: &str,
368 broker_id: u64,
369 ) -> Result<()> {
370 if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
371 sync_state_set.remove_broker(broker_id);
372 warn!(
373 "Removed broker {} from sync state set for {}",
374 broker_id, broker_name
375 );
376 }
377
378 if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
380 if let Some(replica) = replicas.get_mut(&broker_id) {
381 replica.in_sync = false;
382 }
383 }
384
385 Ok(())
386 }
387
388 pub async fn update_sync_state_set(
390 &self,
391 broker_name: &str,
392 new_sync_state_set: Vec<u64>,
393 ) -> Result<()> {
394 if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
395 sync_state_set.sync_state_set = new_sync_state_set.clone();
396 sync_state_set.last_update_timestamp = SystemTime::now()
397 .duration_since(SystemTime::UNIX_EPOCH)
398 .unwrap()
399 .as_secs();
400
401 debug!(
402 "Updated sync state set for {}: {:?}",
403 broker_name, new_sync_state_set
404 );
405 }
406
407 if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
409 for (broker_id, replica) in replicas.iter_mut() {
410 replica.in_sync = new_sync_state_set.contains(broker_id);
411 }
412 }
413
414 Ok(())
415 }
416
417 pub async fn elect_master(&self, broker_name: &str) -> Result<Option<BrokerReplicaInfo>> {
422 info!("Electing new master for broker set: {}", broker_name);
423
424 let sync_state_set = match self.sync_state_sets.get(broker_name) {
426 Some(s) => s.clone(),
427 None => {
428 warn!("No sync state set found for {}", broker_name);
429 return Ok(None);
430 }
431 };
432
433 let new_master_id = sync_state_set
435 .sync_state_set
436 .iter()
437 .find(|&&id| id != sync_state_set.master_broker_id)
438 .copied();
439
440 let new_master_id = match new_master_id {
441 Some(id) => id,
442 None => {
443 warn!("No in-sync slaves available for {}", broker_name);
444 return Ok(None);
445 }
446 };
447
448 let mut new_master = None;
450 if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
451 if let Some(old_master) = replicas.get_mut(&sync_state_set.master_broker_id) {
453 old_master.role = ReplicaRole::Slave;
454 }
455
456 if let Some(replica) = replicas.get_mut(&new_master_id) {
458 replica.role = ReplicaRole::Master;
459 replica.epoch += 1;
460 new_master = Some(replica.clone());
461
462 info!(
463 "Elected new master for {}: {} (epoch={})",
464 broker_name, new_master_id, replica.epoch
465 );
466 }
467 }
468
469 if let (Some(master), Some(mut sync_state_set)) = (
471 new_master.as_ref(),
472 self.sync_state_sets.get_mut(broker_name),
473 ) {
474 sync_state_set.master_broker_id = new_master_id;
475 sync_state_set.master_addr = master.broker_addr.clone();
476 sync_state_set.master_epoch = master.epoch;
477 sync_state_set.last_update_timestamp = SystemTime::now()
478 .duration_since(SystemTime::UNIX_EPOCH)
479 .unwrap()
480 .as_secs();
481 }
482
483 Ok(new_master)
484 }
485
486 pub async fn list_broker_sets(&self) -> Vec<String> {
488 self.replicas
489 .iter()
490 .map(|entry| entry.key().clone())
491 .collect()
492 }
493
494 pub async fn get_stats(&self) -> HashMap<String, usize> {
496 let mut stats = HashMap::new();
497 stats.insert("broker_sets".to_string(), self.replicas.len());
498 stats.insert("sync_state_sets".to_string(), self.sync_state_sets.len());
499
500 let total_replicas: usize = self.replicas.iter().map(|entry| entry.value().len()).sum();
501 stats.insert("total_replicas".to_string(), total_replicas);
502
503 stats
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn test_broker_replica_info() {
513 let master = BrokerReplicaInfo::new_master(
514 "test-cluster".to_string(),
515 "broker-a".to_string(),
516 0,
517 "127.0.0.1:10911".to_string(),
518 1,
519 );
520
521 assert!(master.is_master());
522 assert!(master.is_in_sync());
523 assert_eq!(master.replica_id(), "broker-a:0");
524 assert_eq!(master.epoch, 1);
525
526 let slave = BrokerReplicaInfo::new_slave(
527 "test-cluster".to_string(),
528 "broker-a".to_string(),
529 1,
530 "127.0.0.1:10912".to_string(),
531 );
532
533 assert!(!slave.is_master());
534 assert!(!slave.is_in_sync());
535 assert_eq!(slave.replica_id(), "broker-a:1");
536 assert_eq!(slave.epoch, 0);
537 }
538
539 #[test]
540 fn test_sync_state_set() {
541 let mut sync_state =
542 SyncStateSet::new("broker-a".to_string(), 0, "127.0.0.1:10911".to_string());
543
544 assert_eq!(sync_state.size(), 1);
545 assert!(sync_state.contains(0));
546 assert!(sync_state.is_master(0));
547
548 sync_state.add_broker(1);
549 assert_eq!(sync_state.size(), 2);
550 assert!(sync_state.contains(1));
551 assert!(!sync_state.is_master(1));
552
553 sync_state.remove_broker(1);
554 assert_eq!(sync_state.size(), 1);
555 assert!(!sync_state.contains(1));
556 }
557
558 #[tokio::test]
559 async fn test_replicas_manager() {
560 let config = Arc::new(ControllerConfig::new(1, "127.0.0.1:9876".parse().unwrap()));
561 let manager = ReplicasManager::new(config);
562
563 let master = BrokerReplicaInfo::new_master(
565 "test-cluster".to_string(),
566 "broker-a".to_string(),
567 0,
568 "127.0.0.1:10911".to_string(),
569 1,
570 );
571 manager.register_replica(master.clone()).await.unwrap();
572
573 let slave = BrokerReplicaInfo::new_slave(
575 "test-cluster".to_string(),
576 "broker-a".to_string(),
577 1,
578 "127.0.0.1:10912".to_string(),
579 );
580 manager.register_replica(slave).await.unwrap();
581
582 let replicas = manager.get_replicas("broker-a").await;
584 assert_eq!(replicas.len(), 2);
585
586 let master_replica = manager.get_master("broker-a").await;
587 assert!(master_replica.is_some());
588 assert_eq!(master_replica.unwrap().broker_id, 0);
589
590 manager.add_to_sync_state_set("broker-a", 1).await.unwrap();
592 let new_master = manager.elect_master("broker-a").await.unwrap();
593 assert!(new_master.is_some());
594 assert_eq!(new_master.unwrap().broker_id, 1);
595 }
596}