rocketmq_controller/metadata/
replica.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::SystemTime;
21
22use dashmap::DashMap;
23use serde::Deserialize;
24use serde::Serialize;
25use tracing::debug;
26use tracing::info;
27use tracing::warn;
28
29use crate::config::ControllerConfig;
30use crate::error::Result;
31
32/// Replica role
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum ReplicaRole {
35    /// Master replica
36    Master,
37    /// Slave replica
38    Slave,
39}
40
41/// Broker replica information
42///
43/// Tracks the state of a broker replica including:
44/// - Role (master/slave)
45/// - Epoch (version number for master election)
46/// - Sync state (whether replica is in-sync)
47/// - Offset tracking
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct BrokerReplicaInfo {
50    /// Cluster name
51    pub cluster_name: String,
52
53    /// Broker name (identifies the broker set)
54    pub broker_name: String,
55
56    /// Broker ID (0 for master, >0 for slaves)
57    pub broker_id: u64,
58
59    /// Broker address
60    pub broker_addr: String,
61
62    /// Replica role
63    pub role: ReplicaRole,
64
65    /// Epoch number (incremented on master election)
66    pub epoch: u64,
67
68    /// Maximum offset
69    pub max_offset: i64,
70
71    /// Last sync timestamp
72    pub last_sync_timestamp: u64,
73
74    /// Whether this replica is in-sync
75    pub in_sync: bool,
76}
77
78impl BrokerReplicaInfo {
79    /// Create a new master replica
80    pub fn new_master(
81        cluster_name: String,
82        broker_name: String,
83        broker_id: u64,
84        broker_addr: String,
85        epoch: u64,
86    ) -> Self {
87        Self {
88            cluster_name,
89            broker_name,
90            broker_id,
91            broker_addr,
92            role: ReplicaRole::Master,
93            epoch,
94            max_offset: 0,
95            last_sync_timestamp: SystemTime::now()
96                .duration_since(SystemTime::UNIX_EPOCH)
97                .unwrap()
98                .as_secs(),
99            in_sync: true,
100        }
101    }
102
103    /// Create a new slave replica
104    pub fn new_slave(
105        cluster_name: String,
106        broker_name: String,
107        broker_id: u64,
108        broker_addr: String,
109    ) -> Self {
110        Self {
111            cluster_name,
112            broker_name,
113            broker_id,
114            broker_addr,
115            role: ReplicaRole::Slave,
116            epoch: 0,
117            max_offset: 0,
118            last_sync_timestamp: SystemTime::now()
119                .duration_since(SystemTime::UNIX_EPOCH)
120                .unwrap()
121                .as_secs(),
122            in_sync: false,
123        }
124    }
125
126    /// Check if this is a master replica
127    pub fn is_master(&self) -> bool {
128        self.role == ReplicaRole::Master
129    }
130
131    /// Check if this replica is in-sync
132    pub fn is_in_sync(&self) -> bool {
133        self.in_sync
134    }
135
136    /// Get replica ID (broker_name:broker_id)
137    pub fn replica_id(&self) -> String {
138        format!("{}:{}", self.broker_name, self.broker_id)
139    }
140}
141
142/// Sync state set (In-Sync Replicas)
143///
144/// Tracks the set of replicas that are considered in-sync with the master.
145/// Similar to Kafka's ISR (In-Sync Replica) concept.
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct SyncStateSet {
148    /// Broker name (identifies the broker set)
149    pub broker_name: String,
150
151    /// Master broker ID
152    pub master_broker_id: u64,
153
154    /// Master address
155    pub master_addr: String,
156
157    /// Master epoch
158    pub master_epoch: u64,
159
160    /// Sync state set (list of in-sync broker IDs)
161    pub sync_state_set: Vec<u64>,
162
163    /// Last update timestamp
164    pub last_update_timestamp: u64,
165}
166
167impl SyncStateSet {
168    /// Create a new sync state set
169    pub fn new(broker_name: String, master_broker_id: u64, master_addr: String) -> Self {
170        Self {
171            broker_name,
172            master_broker_id,
173            master_addr,
174            master_epoch: 0,
175            sync_state_set: vec![master_broker_id],
176            last_update_timestamp: SystemTime::now()
177                .duration_since(SystemTime::UNIX_EPOCH)
178                .unwrap()
179                .as_secs(),
180        }
181    }
182
183    /// Check if a broker is in the sync state set
184    pub fn contains(&self, broker_id: u64) -> bool {
185        self.sync_state_set.contains(&broker_id)
186    }
187
188    /// Add a broker to the sync state set
189    pub fn add_broker(&mut self, broker_id: u64) {
190        if !self.sync_state_set.contains(&broker_id) {
191            self.sync_state_set.push(broker_id);
192            self.last_update_timestamp = SystemTime::now()
193                .duration_since(SystemTime::UNIX_EPOCH)
194                .unwrap()
195                .as_secs();
196        }
197    }
198
199    /// Remove a broker from the sync state set
200    pub fn remove_broker(&mut self, broker_id: u64) {
201        self.sync_state_set.retain(|&id| id != broker_id);
202        self.last_update_timestamp = SystemTime::now()
203            .duration_since(SystemTime::UNIX_EPOCH)
204            .unwrap()
205            .as_secs();
206    }
207
208    /// Get the size of the sync state set
209    pub fn size(&self) -> usize {
210        self.sync_state_set.len()
211    }
212
213    /// Check if a broker is the master
214    pub fn is_master(&self, broker_id: u64) -> bool {
215        self.master_broker_id == broker_id
216    }
217}
218
219/// Replicas manager
220///
221/// Manages broker replicas and sync state sets across the cluster.
222/// Provides functionality for:
223/// - Replica registration and tracking
224/// - ISR (In-Sync Replicas) management
225/// - Master election and failover
226pub struct ReplicasManager {
227    /// Configuration
228    config: Arc<ControllerConfig>,
229
230    /// Replicas: broker_name -> (broker_id -> BrokerReplicaInfo)
231    replicas: Arc<DashMap<String, HashMap<u64, BrokerReplicaInfo>>>,
232
233    /// Sync state sets: broker_name -> SyncStateSet
234    sync_state_sets: Arc<DashMap<String, SyncStateSet>>,
235}
236
237impl ReplicasManager {
238    /// Create a new replicas manager
239    pub fn new(config: Arc<ControllerConfig>) -> Self {
240        Self {
241            config,
242            replicas: Arc::new(DashMap::new()),
243            sync_state_sets: Arc::new(DashMap::new()),
244        }
245    }
246
247    /// Start the replicas manager
248    pub async fn start(&self) -> Result<()> {
249        info!("Starting replicas manager");
250        Ok(())
251    }
252
253    /// Shutdown the replicas manager
254    pub async fn shutdown(&self) -> Result<()> {
255        info!("Shutting down replicas manager");
256        self.replicas.clear();
257        self.sync_state_sets.clear();
258        Ok(())
259    }
260
261    /// Register a replica
262    pub async fn register_replica(&self, replica: BrokerReplicaInfo) -> Result<()> {
263        let broker_name = replica.broker_name.clone();
264        let broker_id = replica.broker_id;
265        let is_master = replica.is_master();
266
267        debug!(
268            "Registering replica: {}:{} (role={:?})",
269            broker_name, broker_id, replica.role
270        );
271
272        // Update replicas map
273        self.replicas
274            .entry(broker_name.clone())
275            .or_default()
276            .insert(broker_id, replica.clone());
277
278        // If this is a master, initialize or update sync state set
279        if is_master {
280            let mut sync_state_set =
281                SyncStateSet::new(broker_name.clone(), broker_id, replica.broker_addr.clone());
282            sync_state_set.master_epoch = replica.epoch;
283            self.sync_state_sets.insert(broker_name, sync_state_set);
284        }
285
286        Ok(())
287    }
288
289    /// Unregister a replica
290    pub async fn unregister_replica(&self, broker_name: &str, broker_id: u64) -> Result<()> {
291        debug!("Unregistering replica: {}:{}", broker_name, broker_id);
292
293        // Remove from replicas map
294        if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
295            replicas.remove(&broker_id);
296            if replicas.is_empty() {
297                drop(replicas);
298                self.replicas.remove(broker_name);
299            }
300        }
301
302        // Remove from sync state set
303        if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
304            sync_state_set.remove_broker(broker_id);
305        }
306
307        Ok(())
308    }
309
310    /// Get the master replica for a broker set
311    pub async fn get_master(&self, broker_name: &str) -> Option<BrokerReplicaInfo> {
312        self.replicas.get(broker_name).and_then(|replicas| {
313            replicas
314                .values()
315                .find(|replica| replica.is_master())
316                .cloned()
317        })
318    }
319
320    /// Get all replicas for a broker set
321    pub async fn get_replicas(&self, broker_name: &str) -> Vec<BrokerReplicaInfo> {
322        self.replicas
323            .get(broker_name)
324            .map(|replicas| replicas.values().cloned().collect())
325            .unwrap_or_default()
326    }
327
328    /// Get a specific replica
329    pub async fn get_replica(
330        &self,
331        broker_name: &str,
332        broker_id: u64,
333    ) -> Option<BrokerReplicaInfo> {
334        self.replicas
335            .get(broker_name)
336            .and_then(|replicas| replicas.get(&broker_id).cloned())
337    }
338
339    /// Get sync state set for a broker set
340    pub async fn get_sync_state_set(&self, broker_name: &str) -> Option<SyncStateSet> {
341        self.sync_state_sets.get(broker_name).map(|s| s.clone())
342    }
343
344    /// Add a broker to the sync state set
345    pub async fn add_to_sync_state_set(&self, broker_name: &str, broker_id: u64) -> Result<()> {
346        if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
347            sync_state_set.add_broker(broker_id);
348            debug!(
349                "Added broker {} to sync state set for {}",
350                broker_id, broker_name
351            );
352        }
353
354        // Update replica in-sync status
355        if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
356            if let Some(replica) = replicas.get_mut(&broker_id) {
357                replica.in_sync = true;
358            }
359        }
360
361        Ok(())
362    }
363
364    /// Remove a broker from the sync state set
365    pub async fn remove_from_sync_state_set(
366        &self,
367        broker_name: &str,
368        broker_id: u64,
369    ) -> Result<()> {
370        if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
371            sync_state_set.remove_broker(broker_id);
372            warn!(
373                "Removed broker {} from sync state set for {}",
374                broker_id, broker_name
375            );
376        }
377
378        // Update replica in-sync status
379        if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
380            if let Some(replica) = replicas.get_mut(&broker_id) {
381                replica.in_sync = false;
382            }
383        }
384
385        Ok(())
386    }
387
388    /// Update sync state set
389    pub async fn update_sync_state_set(
390        &self,
391        broker_name: &str,
392        new_sync_state_set: Vec<u64>,
393    ) -> Result<()> {
394        if let Some(mut sync_state_set) = self.sync_state_sets.get_mut(broker_name) {
395            sync_state_set.sync_state_set = new_sync_state_set.clone();
396            sync_state_set.last_update_timestamp = SystemTime::now()
397                .duration_since(SystemTime::UNIX_EPOCH)
398                .unwrap()
399                .as_secs();
400
401            debug!(
402                "Updated sync state set for {}: {:?}",
403                broker_name, new_sync_state_set
404            );
405        }
406
407        // Update replica in-sync status
408        if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
409            for (broker_id, replica) in replicas.iter_mut() {
410                replica.in_sync = new_sync_state_set.contains(broker_id);
411            }
412        }
413
414        Ok(())
415    }
416
417    /// Elect a new master for a broker set
418    ///
419    /// This is typically called when the current master fails.
420    /// A new master is elected from the in-sync replicas.
421    pub async fn elect_master(&self, broker_name: &str) -> Result<Option<BrokerReplicaInfo>> {
422        info!("Electing new master for broker set: {}", broker_name);
423
424        // Get current sync state set
425        let sync_state_set = match self.sync_state_sets.get(broker_name) {
426            Some(s) => s.clone(),
427            None => {
428                warn!("No sync state set found for {}", broker_name);
429                return Ok(None);
430            }
431        };
432
433        // Find the first in-sync slave to promote
434        let new_master_id = sync_state_set
435            .sync_state_set
436            .iter()
437            .find(|&&id| id != sync_state_set.master_broker_id)
438            .copied();
439
440        let new_master_id = match new_master_id {
441            Some(id) => id,
442            None => {
443                warn!("No in-sync slaves available for {}", broker_name);
444                return Ok(None);
445            }
446        };
447
448        // Promote the slave to master
449        let mut new_master = None;
450        if let Some(mut replicas) = self.replicas.get_mut(broker_name) {
451            // Demote old master if it still exists
452            if let Some(old_master) = replicas.get_mut(&sync_state_set.master_broker_id) {
453                old_master.role = ReplicaRole::Slave;
454            }
455
456            // Promote new master
457            if let Some(replica) = replicas.get_mut(&new_master_id) {
458                replica.role = ReplicaRole::Master;
459                replica.epoch += 1;
460                new_master = Some(replica.clone());
461
462                info!(
463                    "Elected new master for {}: {} (epoch={})",
464                    broker_name, new_master_id, replica.epoch
465                );
466            }
467        }
468
469        // Update sync state set
470        if let (Some(master), Some(mut sync_state_set)) = (
471            new_master.as_ref(),
472            self.sync_state_sets.get_mut(broker_name),
473        ) {
474            sync_state_set.master_broker_id = new_master_id;
475            sync_state_set.master_addr = master.broker_addr.clone();
476            sync_state_set.master_epoch = master.epoch;
477            sync_state_set.last_update_timestamp = SystemTime::now()
478                .duration_since(SystemTime::UNIX_EPOCH)
479                .unwrap()
480                .as_secs();
481        }
482
483        Ok(new_master)
484    }
485
486    /// List all broker sets
487    pub async fn list_broker_sets(&self) -> Vec<String> {
488        self.replicas
489            .iter()
490            .map(|entry| entry.key().clone())
491            .collect()
492    }
493
494    /// Get statistics
495    pub async fn get_stats(&self) -> HashMap<String, usize> {
496        let mut stats = HashMap::new();
497        stats.insert("broker_sets".to_string(), self.replicas.len());
498        stats.insert("sync_state_sets".to_string(), self.sync_state_sets.len());
499
500        let total_replicas: usize = self.replicas.iter().map(|entry| entry.value().len()).sum();
501        stats.insert("total_replicas".to_string(), total_replicas);
502
503        stats
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[test]
512    fn test_broker_replica_info() {
513        let master = BrokerReplicaInfo::new_master(
514            "test-cluster".to_string(),
515            "broker-a".to_string(),
516            0,
517            "127.0.0.1:10911".to_string(),
518            1,
519        );
520
521        assert!(master.is_master());
522        assert!(master.is_in_sync());
523        assert_eq!(master.replica_id(), "broker-a:0");
524        assert_eq!(master.epoch, 1);
525
526        let slave = BrokerReplicaInfo::new_slave(
527            "test-cluster".to_string(),
528            "broker-a".to_string(),
529            1,
530            "127.0.0.1:10912".to_string(),
531        );
532
533        assert!(!slave.is_master());
534        assert!(!slave.is_in_sync());
535        assert_eq!(slave.replica_id(), "broker-a:1");
536        assert_eq!(slave.epoch, 0);
537    }
538
539    #[test]
540    fn test_sync_state_set() {
541        let mut sync_state =
542            SyncStateSet::new("broker-a".to_string(), 0, "127.0.0.1:10911".to_string());
543
544        assert_eq!(sync_state.size(), 1);
545        assert!(sync_state.contains(0));
546        assert!(sync_state.is_master(0));
547
548        sync_state.add_broker(1);
549        assert_eq!(sync_state.size(), 2);
550        assert!(sync_state.contains(1));
551        assert!(!sync_state.is_master(1));
552
553        sync_state.remove_broker(1);
554        assert_eq!(sync_state.size(), 1);
555        assert!(!sync_state.contains(1));
556    }
557
558    #[tokio::test]
559    async fn test_replicas_manager() {
560        let config = Arc::new(ControllerConfig::new(1, "127.0.0.1:9876".parse().unwrap()));
561        let manager = ReplicasManager::new(config);
562
563        // Register master
564        let master = BrokerReplicaInfo::new_master(
565            "test-cluster".to_string(),
566            "broker-a".to_string(),
567            0,
568            "127.0.0.1:10911".to_string(),
569            1,
570        );
571        manager.register_replica(master.clone()).await.unwrap();
572
573        // Register slave
574        let slave = BrokerReplicaInfo::new_slave(
575            "test-cluster".to_string(),
576            "broker-a".to_string(),
577            1,
578            "127.0.0.1:10912".to_string(),
579        );
580        manager.register_replica(slave).await.unwrap();
581
582        // Verify
583        let replicas = manager.get_replicas("broker-a").await;
584        assert_eq!(replicas.len(), 2);
585
586        let master_replica = manager.get_master("broker-a").await;
587        assert!(master_replica.is_some());
588        assert_eq!(master_replica.unwrap().broker_id, 0);
589
590        // Elect new master
591        manager.add_to_sync_state_set("broker-a", 1).await.unwrap();
592        let new_master = manager.elect_master("broker-a").await.unwrap();
593        assert!(new_master.is_some());
594        assert_eq!(new_master.unwrap().broker_id, 1);
595    }
596}