rocketmq_remoting/protocol/route/
route_data_view.rs1use std::cmp::Ordering;
19use std::collections::HashMap;
20
21use cheetah_string::CheetahString;
22use rand::seq::IteratorRandom;
23use rocketmq_common::common::mix_all;
24use serde::Deserialize;
25use serde::Serialize;
26
27#[derive(Debug, Clone, Serialize, Deserialize, Default)]
28pub struct BrokerData {
29 cluster: CheetahString,
30 #[serde(rename = "brokerName")]
31 broker_name: CheetahString,
32 #[serde(rename = "brokerAddrs")]
33 broker_addrs: HashMap<u64 , CheetahString >,
34 #[serde(rename = "zoneName")]
35 zone_name: Option<CheetahString>,
36 #[serde(rename = "enableActingMaster")]
37 enable_acting_master: bool,
38}
39
40impl PartialEq for BrokerData {
41 fn eq(&self, other: &Self) -> bool {
42 self.broker_name == other.broker_name && self.broker_addrs == other.broker_addrs
43 }
44}
45
46impl Eq for BrokerData {}
47
48impl PartialOrd for BrokerData {
49 #[inline]
50 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
51 Some(self.cmp(other))
52 }
53}
54
55impl Ord for BrokerData {
56 #[inline]
57 fn cmp(&self, other: &Self) -> Ordering {
58 self.broker_name.cmp(&other.broker_name)
59 }
60}
61
62impl BrokerData {
63 pub fn new(
64 cluster: CheetahString,
65 broker_name: CheetahString,
66 broker_addrs: HashMap<u64, CheetahString>,
67 zone_name: Option<CheetahString>,
68 ) -> BrokerData {
69 BrokerData {
70 cluster,
71 broker_name,
72 broker_addrs,
73 zone_name,
74 enable_acting_master: false,
75 }
76 }
77
78 #[inline]
79 pub fn set_cluster(&mut self, cluster: CheetahString) {
80 self.cluster = cluster;
81 }
82
83 #[inline]
84 pub fn set_broker_name(&mut self, broker_name: CheetahString) {
85 self.broker_name = broker_name;
86 }
87
88 #[inline]
89 pub fn set_broker_addrs(&mut self, broker_addrs: HashMap<u64, CheetahString>) {
90 self.broker_addrs = broker_addrs;
91 }
92
93 #[inline]
94 pub fn set_zone_name(&mut self, zone_name: Option<CheetahString>) {
95 self.zone_name = zone_name;
96 }
97
98 #[inline]
99 pub fn set_enable_acting_master(&mut self, enable_acting_master: bool) {
100 self.enable_acting_master = enable_acting_master;
101 }
102
103 #[inline]
104 pub fn cluster(&self) -> &str {
105 &self.cluster
106 }
107
108 #[inline]
109 pub fn broker_name(&self) -> &CheetahString {
110 &self.broker_name
111 }
112
113 #[inline]
114 pub fn broker_addrs(&self) -> &HashMap<u64, CheetahString> {
115 &self.broker_addrs
116 }
117
118 #[inline]
119 pub fn broker_addrs_mut(&mut self) -> &mut HashMap<u64, CheetahString> {
120 &mut self.broker_addrs
121 }
122
123 #[inline]
124 pub fn remove_broker_by_addr(&mut self, broker_id: u64, broker_addr: &CheetahString) {
125 self.broker_addrs
126 .retain(|key, value| value != broker_addr || *key == broker_id);
127 }
128
129 #[inline]
130 pub fn zone_name(&self) -> Option<&CheetahString> {
131 self.zone_name.as_ref()
132 }
133
134 #[inline]
135 pub fn enable_acting_master(&self) -> bool {
136 self.enable_acting_master
137 }
138
139 pub fn select_broker_addr(&self) -> Option<CheetahString> {
140 let master_address = self.broker_addrs.get(&(mix_all::MASTER_ID)).cloned();
141 if master_address.is_none() {
142 return self.broker_addrs.values().choose(&mut rand::rng()).cloned();
143 }
144 master_address
145 }
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Default)]
149pub struct QueueData {
150 #[serde(rename = "brokerName")]
151 pub broker_name: CheetahString,
152 #[serde(rename = "readQueueNums")]
153 pub read_queue_nums: u32,
154 #[serde(rename = "writeQueueNums")]
155 pub write_queue_nums: u32,
156 pub perm: u32,
157 #[serde(rename = "topicSysFlag")]
158 pub topic_sys_flag: u32,
159}
160
161impl QueueData {
162 pub fn new(
163 broker_name: CheetahString,
164 read_queue_nums: u32,
165 write_queue_nums: u32,
166 perm: u32,
167 topic_sys_flag: u32,
168 ) -> Self {
169 Self {
170 broker_name,
171 read_queue_nums,
172 write_queue_nums,
173 perm,
174 topic_sys_flag,
175 }
176 }
177
178 #[inline]
179 pub fn broker_name(&self) -> &CheetahString {
180 &self.broker_name
181 }
182
183 #[inline]
184 pub fn read_queue_nums(&self) -> u32 {
185 self.read_queue_nums
186 }
187
188 #[inline]
189 pub fn write_queue_nums(&self) -> u32 {
190 self.write_queue_nums
191 }
192
193 #[inline]
194 pub fn perm(&self) -> u32 {
195 self.perm
196 }
197
198 #[inline]
199 pub fn topic_sys_flag(&self) -> u32 {
200 self.topic_sys_flag
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use std::collections::HashMap;
207
208 use super::*;
209
210 #[test]
211 fn broker_data_new_initializes_correctly() {
212 let cluster = CheetahString::from("test_cluster");
213 let broker_name = CheetahString::from("test_broker");
214 let broker_addrs = HashMap::new();
215 let zone_name = CheetahString::from("test_zone");
216
217 let broker_data = BrokerData::new(
218 cluster.clone(),
219 broker_name.clone(),
220 broker_addrs.clone(),
221 Some(zone_name.clone()),
222 );
223
224 assert_eq!(broker_data.cluster, cluster);
225 assert_eq!(broker_data.broker_name, broker_name);
226 assert_eq!(broker_data.broker_addrs, broker_addrs);
227 if let Some(zone) = &broker_data.zone_name {
228 assert_eq!(zone, &zone_name);
229 }
230 assert!(!broker_data.enable_acting_master);
231 }
232
233 #[test]
234 fn broker_data_setters_work_correctly() {
235 let mut broker_data = BrokerData::new(
236 CheetahString::from("cluster1"),
237 CheetahString::from("broker1"),
238 HashMap::new(),
239 None,
240 );
241
242 broker_data.set_cluster(CheetahString::from("cluster2"));
243 broker_data.set_broker_name(CheetahString::from("broker2"));
244 broker_data.set_broker_addrs(HashMap::from([(1, CheetahString::from("127.0.0.1"))]));
245 broker_data.set_zone_name(Some(CheetahString::from("zone1")));
246 broker_data.set_enable_acting_master(true);
247
248 assert_eq!(broker_data.cluster, CheetahString::from("cluster2"));
249 assert_eq!(broker_data.broker_name, CheetahString::from("broker2"));
250 assert_eq!(
251 broker_data.broker_addrs.get(&1).unwrap(),
252 &CheetahString::from("127.0.0.1")
253 );
254 if let Some(zone_name) = &broker_data.zone_name {
255 assert_eq!(zone_name, &CheetahString::from("zone1"));
256 }
257 assert!(broker_data.enable_acting_master);
258 }
259
260 #[test]
261 fn broker_data_remove_broker_by_addr_works_correctly() {
262 let mut broker_data = BrokerData::new(
263 CheetahString::from("cluster1"),
264 CheetahString::from("broker1"),
265 HashMap::from([
266 (1, CheetahString::from("127.0.0.1")),
267 (2, CheetahString::from("127.0.0.2")),
268 ]),
269 None,
270 );
271
272 broker_data.remove_broker_by_addr(1, &"127.0.0.1".into());
273 assert!(broker_data.broker_addrs.contains_key(&2));
275 }
276
277 #[test]
278 fn broker_data_select_broker_addr_returns_master_if_exists() {
279 let broker_data = BrokerData::new(
280 CheetahString::from("cluster1"),
281 CheetahString::from("broker1"),
282 HashMap::from([(mix_all::MASTER_ID, CheetahString::from("127.0.0.1"))]),
283 None,
284 );
285
286 let selected_addr = broker_data.select_broker_addr();
287 assert_eq!(selected_addr.unwrap(), CheetahString::from("127.0.0.1"));
288 }
289
290 #[test]
291 fn broker_data_select_broker_addr_returns_random_if_no_master() {
292 let broker_data = BrokerData::new(
293 CheetahString::from("cluster1"),
294 CheetahString::from("broker1"),
295 HashMap::from([(2, CheetahString::from("127.0.0.2"))]),
296 None,
297 );
298
299 let selected_addr = broker_data.select_broker_addr();
300 assert_eq!(selected_addr.unwrap(), CheetahString::from("127.0.0.2"));
301 }
302
303 #[test]
304 fn queue_data_new_initializes_correctly() {
305 let queue_data = QueueData::new(CheetahString::from("broker1"), 4, 4, 6, 0);
306
307 assert_eq!(queue_data.broker_name, CheetahString::from("broker1"));
308 assert_eq!(queue_data.read_queue_nums, 4);
309 assert_eq!(queue_data.write_queue_nums, 4);
310 assert_eq!(queue_data.perm, 6);
311 assert_eq!(queue_data.topic_sys_flag, 0);
312 }
313}