rocketmq_remoting/rpc/
client_metadata.rs1use std::collections::HashMap;
18use std::sync::Arc;
19
20use cheetah_string::CheetahString;
21use parking_lot::RwLock;
22use rocketmq_common::common::message::message_queue::MessageQueue;
23use rocketmq_common::common::mix_all;
24use rocketmq_common::common::mix_all::MASTER_ID;
25
26use crate::protocol::body::broker_body::cluster_info::ClusterInfo;
27use crate::protocol::route::topic_route_data::TopicRouteData;
28use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
29use crate::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils;
30
31#[derive(Default, Clone)]
32pub struct ClientMetadata {
33 topic_route_table: Arc<RwLock<HashMap<CheetahString , TopicRouteData>>>,
34 topic_end_points_table: Arc<
35 RwLock<
36 HashMap<
37 CheetahString, HashMap<MessageQueue, CheetahString >,
39 >,
40 >,
41 >,
42 broker_addr_table: Arc<
43 RwLock<
44 HashMap<
45 CheetahString, HashMap<u64 , CheetahString >,
47 >,
48 >,
49 >,
50 broker_version_table: Arc<
51 RwLock<
52 HashMap<
53 CheetahString, HashMap<CheetahString , i32>,
55 >,
56 >,
57 >,
58}
59
60impl ClientMetadata {
61 pub fn new() -> Self {
62 Self {
63 topic_route_table: Arc::new(RwLock::new(HashMap::new())),
64 topic_end_points_table: Arc::new(RwLock::new(HashMap::new())),
65 broker_addr_table: Arc::new(RwLock::new(HashMap::new())),
66 broker_version_table: Arc::new(RwLock::new(HashMap::new())),
67 }
68 }
69
70 pub fn fresh_topic_route(
71 &self,
72 topic: &CheetahString,
73 topic_route_data: Option<TopicRouteData>,
74 ) {
75 if topic.is_empty() || topic_route_data.is_none() {
76 return;
77 }
78 let read_guard = self.topic_route_table.read();
79 let old = read_guard.get(topic);
80 if !topic_route_data
81 .as_ref()
82 .unwrap()
83 .topic_route_data_changed(old)
84 {
85 return;
86 }
87 drop(read_guard);
88 let topic_route_data = topic_route_data.unwrap();
89 {
90 let mut write_guard = self.broker_addr_table.write();
91 for bd in topic_route_data.broker_datas.iter() {
92 write_guard.insert(bd.broker_name().clone(), bd.broker_addrs().clone());
93 }
94 }
95
96 {
97 let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(
98 topic,
99 &topic_route_data,
100 );
101 if let Some(mq_end_points) = mq_end_points {
102 let mut write_guard = self.topic_end_points_table.write();
103 write_guard.insert(topic.clone(), mq_end_points);
104 }
105 }
106 }
107
108 pub fn get_broker_name_from_message_queue(&self, mq: &MessageQueue) -> Option<CheetahString> {
109 let read_guard = self.topic_end_points_table.read();
110 let topic_end_points = read_guard.get(mq.get_topic());
111 if topic_end_points.is_none() {
112 return Some(mq.get_broker_name().clone());
113 }
114 let topic_end_points = topic_end_points.unwrap();
115 let broker_name = topic_end_points.get(mq);
116 broker_name.cloned()
117 }
118
119 pub fn refresh_cluster_info(&self, cluster_info: Option<&ClusterInfo>) {
120 if cluster_info.is_none() {
121 return;
122 }
123 let cluster_info = cluster_info.unwrap();
124 if cluster_info.broker_addr_table.is_none() {
125 return;
126 }
127
128 let mut write_guard = self.broker_addr_table.write();
129 for (broker_name, broker_data) in cluster_info.broker_addr_table.as_ref().unwrap() {
130 let broker_addr = broker_data.broker_addrs();
131 write_guard.insert(broker_name.clone(), broker_addr.clone());
132 }
133 }
134
135 pub fn find_master_broker_addr(&self, broker_name: &str) -> Option<CheetahString> {
136 let read_guard = self.broker_addr_table.read();
137 if !read_guard.contains_key(broker_name) {
138 return None;
139 }
140 let broker_addr = read_guard.get(broker_name).unwrap().get(&(MASTER_ID));
141 broker_addr.cloned()
142 }
143
144 pub fn topic_route_data2endpoints_for_static_topic(
145 topic: &str,
146 topic_route_data: &TopicRouteData,
147 ) -> Option<HashMap<MessageQueue, CheetahString>> {
148 if topic_route_data.topic_queue_mapping_by_broker.is_none()
149 || topic_route_data
150 .topic_queue_mapping_by_broker
151 .as_ref()
152 .unwrap()
153 .is_empty()
154 {
155 return Some(HashMap::new());
156 }
157
158 let mut mq_end_points_of_broker = HashMap::new();
159 let mut mapping_infos_by_scope = HashMap::new();
160 for (broker_name, info) in topic_route_data
161 .topic_queue_mapping_by_broker
162 .as_ref()
163 .unwrap()
164 .iter()
165 {
166 let scope = info.scope.as_ref();
167 if let Some(scope_inner) = scope {
168 if !mapping_infos_by_scope.contains_key(scope_inner.as_str()) {
169 mapping_infos_by_scope.insert(scope_inner.to_string(), HashMap::new());
170 }
171 mapping_infos_by_scope
172 .get_mut(scope_inner.as_str())
173 .unwrap()
174 .insert(broker_name.to_string(), info.clone());
175 }
176 }
177
178 for (scope, topic_queue_mapping_info_map) in mapping_infos_by_scope {
179 let mut mq_endpoints: HashMap<MessageQueue, TopicQueueMappingInfo> = HashMap::new();
180 let mut mapping_infos: Vec<_> = topic_queue_mapping_info_map.iter().collect();
181 mapping_infos.sort_by(|a, b| b.1.epoch.cmp(&a.1.epoch));
182
183 let mut max_total_nums = 0;
184 let max_total_num_of_epoch = -1;
185
186 for (_, info) in mapping_infos {
187 if info.epoch >= max_total_num_of_epoch && info.total_queues > max_total_nums {
188 max_total_nums = info.total_queues;
189 }
190 for global_id in info.curr_id_map.as_ref().unwrap().keys() {
191 let mq = MessageQueue::from_parts(
192 topic,
193 TopicQueueMappingUtils::get_mock_broker_name(
194 info.scope.as_ref().unwrap().as_str(),
195 ),
196 *global_id,
197 );
198 if let Some(old_info) = mq_endpoints.get(&mq) {
199 if old_info.epoch <= info.epoch {
200 mq_endpoints.insert(mq, info.clone());
201 }
202 } else {
203 mq_endpoints.insert(mq, info.clone());
204 }
205 }
206 }
207
208 for i in 0..max_total_nums {
209 let mq = MessageQueue::from_parts(
210 topic,
211 TopicQueueMappingUtils::get_mock_broker_name(&scope),
212 i,
213 );
214 let broker_name = mq_endpoints
215 .get(&mq)
216 .map(|info| info.bname.clone().unwrap())
217 .unwrap_or_else(|| {
218 CheetahString::from_static_str(
219 mix_all::LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST,
220 )
221 });
222 mq_end_points_of_broker.insert(mq, broker_name);
223 }
224 }
225
226 Some(mq_end_points_of_broker)
227 }
228
229 pub fn broker_addr_table(
230 &self,
231 ) -> Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>> {
232 self.broker_addr_table.clone()
233 }
234}