rocketmq_remoting/protocol/route/
route_data_view.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20
21use cheetah_string::CheetahString;
22use rand::seq::IteratorRandom;
23use rocketmq_common::common::mix_all;
24use serde::Deserialize;
25use serde::Serialize;
26
27#[derive(Debug, Clone, Serialize, Deserialize, Default)]
28pub struct BrokerData {
29    cluster: CheetahString,
30    #[serde(rename = "brokerName")]
31    broker_name: CheetahString,
32    #[serde(rename = "brokerAddrs")]
33    broker_addrs: HashMap<u64 /* broker id */, CheetahString /* broker ip */>,
34    #[serde(rename = "zoneName")]
35    zone_name: Option<CheetahString>,
36    #[serde(rename = "enableActingMaster")]
37    enable_acting_master: bool,
38}
39
40impl PartialEq for BrokerData {
41    fn eq(&self, other: &Self) -> bool {
42        self.broker_name == other.broker_name && self.broker_addrs == other.broker_addrs
43    }
44}
45
46impl Eq for BrokerData {}
47
48impl PartialOrd for BrokerData {
49    #[inline]
50    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51        Some(self.cmp(other))
52    }
53}
54
55impl Ord for BrokerData {
56    #[inline]
57    fn cmp(&self, other: &Self) -> Ordering {
58        self.broker_name.cmp(&other.broker_name)
59    }
60}
61
62impl BrokerData {
63    pub fn new(
64        cluster: CheetahString,
65        broker_name: CheetahString,
66        broker_addrs: HashMap<u64, CheetahString>,
67        zone_name: Option<CheetahString>,
68    ) -> BrokerData {
69        BrokerData {
70            cluster,
71            broker_name,
72            broker_addrs,
73            zone_name,
74            enable_acting_master: false,
75        }
76    }
77
78    #[inline]
79    pub fn set_cluster(&mut self, cluster: CheetahString) {
80        self.cluster = cluster;
81    }
82
83    #[inline]
84    pub fn set_broker_name(&mut self, broker_name: CheetahString) {
85        self.broker_name = broker_name;
86    }
87
88    #[inline]
89    pub fn set_broker_addrs(&mut self, broker_addrs: HashMap<u64, CheetahString>) {
90        self.broker_addrs = broker_addrs;
91    }
92
93    #[inline]
94    pub fn set_zone_name(&mut self, zone_name: Option<CheetahString>) {
95        self.zone_name = zone_name;
96    }
97
98    #[inline]
99    pub fn set_enable_acting_master(&mut self, enable_acting_master: bool) {
100        self.enable_acting_master = enable_acting_master;
101    }
102
103    #[inline]
104    pub fn cluster(&self) -> &str {
105        &self.cluster
106    }
107
108    #[inline]
109    pub fn broker_name(&self) -> &CheetahString {
110        &self.broker_name
111    }
112
113    #[inline]
114    pub fn broker_addrs(&self) -> &HashMap<u64, CheetahString> {
115        &self.broker_addrs
116    }
117
118    #[inline]
119    pub fn broker_addrs_mut(&mut self) -> &mut HashMap<u64, CheetahString> {
120        &mut self.broker_addrs
121    }
122
123    #[inline]
124    pub fn remove_broker_by_addr(&mut self, broker_id: u64, broker_addr: &CheetahString) {
125        self.broker_addrs
126            .retain(|key, value| value != broker_addr || *key == broker_id);
127    }
128
129    #[inline]
130    pub fn zone_name(&self) -> Option<&CheetahString> {
131        self.zone_name.as_ref()
132    }
133
134    #[inline]
135    pub fn enable_acting_master(&self) -> bool {
136        self.enable_acting_master
137    }
138
139    pub fn select_broker_addr(&self) -> Option<CheetahString> {
140        let master_address = self.broker_addrs.get(&(mix_all::MASTER_ID)).cloned();
141        if master_address.is_none() {
142            return self.broker_addrs.values().choose(&mut rand::rng()).cloned();
143        }
144        master_address
145    }
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Default)]
149pub struct QueueData {
150    #[serde(rename = "brokerName")]
151    pub broker_name: CheetahString,
152    #[serde(rename = "readQueueNums")]
153    pub read_queue_nums: u32,
154    #[serde(rename = "writeQueueNums")]
155    pub write_queue_nums: u32,
156    pub perm: u32,
157    #[serde(rename = "topicSysFlag")]
158    pub topic_sys_flag: u32,
159}
160
161impl QueueData {
162    pub fn new(
163        broker_name: CheetahString,
164        read_queue_nums: u32,
165        write_queue_nums: u32,
166        perm: u32,
167        topic_sys_flag: u32,
168    ) -> Self {
169        Self {
170            broker_name,
171            read_queue_nums,
172            write_queue_nums,
173            perm,
174            topic_sys_flag,
175        }
176    }
177
178    #[inline]
179    pub fn broker_name(&self) -> &CheetahString {
180        &self.broker_name
181    }
182
183    #[inline]
184    pub fn read_queue_nums(&self) -> u32 {
185        self.read_queue_nums
186    }
187
188    #[inline]
189    pub fn write_queue_nums(&self) -> u32 {
190        self.write_queue_nums
191    }
192
193    #[inline]
194    pub fn perm(&self) -> u32 {
195        self.perm
196    }
197
198    #[inline]
199    pub fn topic_sys_flag(&self) -> u32 {
200        self.topic_sys_flag
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use std::collections::HashMap;
207
208    use super::*;
209
210    #[test]
211    fn broker_data_new_initializes_correctly() {
212        let cluster = CheetahString::from("test_cluster");
213        let broker_name = CheetahString::from("test_broker");
214        let broker_addrs = HashMap::new();
215        let zone_name = CheetahString::from("test_zone");
216
217        let broker_data = BrokerData::new(
218            cluster.clone(),
219            broker_name.clone(),
220            broker_addrs.clone(),
221            Some(zone_name.clone()),
222        );
223
224        assert_eq!(broker_data.cluster, cluster);
225        assert_eq!(broker_data.broker_name, broker_name);
226        assert_eq!(broker_data.broker_addrs, broker_addrs);
227        if let Some(zone) = &broker_data.zone_name {
228            assert_eq!(zone, &zone_name);
229        }
230        assert!(!broker_data.enable_acting_master);
231    }
232
233    #[test]
234    fn broker_data_setters_work_correctly() {
235        let mut broker_data = BrokerData::new(
236            CheetahString::from("cluster1"),
237            CheetahString::from("broker1"),
238            HashMap::new(),
239            None,
240        );
241
242        broker_data.set_cluster(CheetahString::from("cluster2"));
243        broker_data.set_broker_name(CheetahString::from("broker2"));
244        broker_data.set_broker_addrs(HashMap::from([(1, CheetahString::from("127.0.0.1"))]));
245        broker_data.set_zone_name(Some(CheetahString::from("zone1")));
246        broker_data.set_enable_acting_master(true);
247
248        assert_eq!(broker_data.cluster, CheetahString::from("cluster2"));
249        assert_eq!(broker_data.broker_name, CheetahString::from("broker2"));
250        assert_eq!(
251            broker_data.broker_addrs.get(&1).unwrap(),
252            &CheetahString::from("127.0.0.1")
253        );
254        if let Some(zone_name) = &broker_data.zone_name {
255            assert_eq!(zone_name, &CheetahString::from("zone1"));
256        }
257        assert!(broker_data.enable_acting_master);
258    }
259
260    #[test]
261    fn broker_data_remove_broker_by_addr_works_correctly() {
262        let mut broker_data = BrokerData::new(
263            CheetahString::from("cluster1"),
264            CheetahString::from("broker1"),
265            HashMap::from([
266                (1, CheetahString::from("127.0.0.1")),
267                (2, CheetahString::from("127.0.0.2")),
268            ]),
269            None,
270        );
271
272        broker_data.remove_broker_by_addr(1, &"127.0.0.1".into());
273        //assert!(broker_data.broker_addrs.get(&1).is_none());
274        assert!(broker_data.broker_addrs.contains_key(&2));
275    }
276
277    #[test]
278    fn broker_data_select_broker_addr_returns_master_if_exists() {
279        let broker_data = BrokerData::new(
280            CheetahString::from("cluster1"),
281            CheetahString::from("broker1"),
282            HashMap::from([(mix_all::MASTER_ID, CheetahString::from("127.0.0.1"))]),
283            None,
284        );
285
286        let selected_addr = broker_data.select_broker_addr();
287        assert_eq!(selected_addr.unwrap(), CheetahString::from("127.0.0.1"));
288    }
289
290    #[test]
291    fn broker_data_select_broker_addr_returns_random_if_no_master() {
292        let broker_data = BrokerData::new(
293            CheetahString::from("cluster1"),
294            CheetahString::from("broker1"),
295            HashMap::from([(2, CheetahString::from("127.0.0.2"))]),
296            None,
297        );
298
299        let selected_addr = broker_data.select_broker_addr();
300        assert_eq!(selected_addr.unwrap(), CheetahString::from("127.0.0.2"));
301    }
302
303    #[test]
304    fn queue_data_new_initializes_correctly() {
305        let queue_data = QueueData::new(CheetahString::from("broker1"), 4, 4, 6, 0);
306
307        assert_eq!(queue_data.broker_name, CheetahString::from("broker1"));
308        assert_eq!(queue_data.read_queue_nums, 4);
309        assert_eq!(queue_data.write_queue_nums, 4);
310        assert_eq!(queue_data.perm, 6);
311        assert_eq!(queue_data.topic_sys_flag, 0);
312    }
313}