rocketmq_remoting/rpc/
client_metadata.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use cheetah_string::CheetahString;
21use parking_lot::RwLock;
22use rocketmq_common::common::message::message_queue::MessageQueue;
23use rocketmq_common::common::mix_all;
24use rocketmq_common::common::mix_all::MASTER_ID;
25
26use crate::protocol::body::broker_body::cluster_info::ClusterInfo;
27use crate::protocol::route::topic_route_data::TopicRouteData;
28use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
29use crate::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils;
30
31#[derive(Default, Clone)]
32pub struct ClientMetadata {
33    topic_route_table: Arc<RwLock<HashMap<CheetahString /* Topic */, TopicRouteData>>>,
34    topic_end_points_table: Arc<
35        RwLock<
36            HashMap<
37                CheetahString, /* Topic */
38                HashMap<MessageQueue, CheetahString /* brokerName */>,
39            >,
40        >,
41    >,
42    broker_addr_table: Arc<
43        RwLock<
44            HashMap<
45                CheetahString, /* Broker Name */
46                HashMap<u64 /* brokerId */, CheetahString /* address */>,
47            >,
48        >,
49    >,
50    broker_version_table: Arc<
51        RwLock<
52            HashMap<
53                CheetahString, /* Broker Name */
54                HashMap<CheetahString /* address */, i32>,
55            >,
56        >,
57    >,
58}
59
60impl ClientMetadata {
61    pub fn new() -> Self {
62        Self {
63            topic_route_table: Arc::new(RwLock::new(HashMap::new())),
64            topic_end_points_table: Arc::new(RwLock::new(HashMap::new())),
65            broker_addr_table: Arc::new(RwLock::new(HashMap::new())),
66            broker_version_table: Arc::new(RwLock::new(HashMap::new())),
67        }
68    }
69
70    pub fn fresh_topic_route(
71        &self,
72        topic: &CheetahString,
73        topic_route_data: Option<TopicRouteData>,
74    ) {
75        if topic.is_empty() || topic_route_data.is_none() {
76            return;
77        }
78        let read_guard = self.topic_route_table.read();
79        let old = read_guard.get(topic);
80        if !topic_route_data
81            .as_ref()
82            .unwrap()
83            .topic_route_data_changed(old)
84        {
85            return;
86        }
87        drop(read_guard);
88        let topic_route_data = topic_route_data.unwrap();
89        {
90            let mut write_guard = self.broker_addr_table.write();
91            for bd in topic_route_data.broker_datas.iter() {
92                write_guard.insert(bd.broker_name().clone(), bd.broker_addrs().clone());
93            }
94        }
95
96        {
97            let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(
98                topic,
99                &topic_route_data,
100            );
101            if let Some(mq_end_points) = mq_end_points {
102                let mut write_guard = self.topic_end_points_table.write();
103                write_guard.insert(topic.clone(), mq_end_points);
104            }
105        }
106    }
107
108    pub fn get_broker_name_from_message_queue(&self, mq: &MessageQueue) -> Option<CheetahString> {
109        let read_guard = self.topic_end_points_table.read();
110        let topic_end_points = read_guard.get(mq.get_topic());
111        if topic_end_points.is_none() {
112            return Some(mq.get_broker_name().clone());
113        }
114        let topic_end_points = topic_end_points.unwrap();
115        let broker_name = topic_end_points.get(mq);
116        broker_name.cloned()
117    }
118
119    pub fn refresh_cluster_info(&self, cluster_info: Option<&ClusterInfo>) {
120        if cluster_info.is_none() {
121            return;
122        }
123        let cluster_info = cluster_info.unwrap();
124        if cluster_info.broker_addr_table.is_none() {
125            return;
126        }
127
128        let mut write_guard = self.broker_addr_table.write();
129        for (broker_name, broker_data) in cluster_info.broker_addr_table.as_ref().unwrap() {
130            let broker_addr = broker_data.broker_addrs();
131            write_guard.insert(broker_name.clone(), broker_addr.clone());
132        }
133    }
134
135    pub fn find_master_broker_addr(&self, broker_name: &str) -> Option<CheetahString> {
136        let read_guard = self.broker_addr_table.read();
137        if !read_guard.contains_key(broker_name) {
138            return None;
139        }
140        let broker_addr = read_guard.get(broker_name).unwrap().get(&(MASTER_ID));
141        broker_addr.cloned()
142    }
143
144    pub fn topic_route_data2endpoints_for_static_topic(
145        topic: &str,
146        topic_route_data: &TopicRouteData,
147    ) -> Option<HashMap<MessageQueue, CheetahString>> {
148        if topic_route_data.topic_queue_mapping_by_broker.is_none()
149            || topic_route_data
150                .topic_queue_mapping_by_broker
151                .as_ref()
152                .unwrap()
153                .is_empty()
154        {
155            return Some(HashMap::new());
156        }
157
158        let mut mq_end_points_of_broker = HashMap::new();
159        let mut mapping_infos_by_scope = HashMap::new();
160        for (broker_name, info) in topic_route_data
161            .topic_queue_mapping_by_broker
162            .as_ref()
163            .unwrap()
164            .iter()
165        {
166            let scope = info.scope.as_ref();
167            if let Some(scope_inner) = scope {
168                if !mapping_infos_by_scope.contains_key(scope_inner.as_str()) {
169                    mapping_infos_by_scope.insert(scope_inner.to_string(), HashMap::new());
170                }
171                mapping_infos_by_scope
172                    .get_mut(scope_inner.as_str())
173                    .unwrap()
174                    .insert(broker_name.to_string(), info.clone());
175            }
176        }
177
178        for (scope, topic_queue_mapping_info_map) in mapping_infos_by_scope {
179            let mut mq_endpoints: HashMap<MessageQueue, TopicQueueMappingInfo> = HashMap::new();
180            let mut mapping_infos: Vec<_> = topic_queue_mapping_info_map.iter().collect();
181            mapping_infos.sort_by(|a, b| b.1.epoch.cmp(&a.1.epoch));
182
183            let mut max_total_nums = 0;
184            let max_total_num_of_epoch = -1;
185
186            for (_, info) in mapping_infos {
187                if info.epoch >= max_total_num_of_epoch && info.total_queues > max_total_nums {
188                    max_total_nums = info.total_queues;
189                }
190                for global_id in info.curr_id_map.as_ref().unwrap().keys() {
191                    let mq = MessageQueue::from_parts(
192                        topic,
193                        TopicQueueMappingUtils::get_mock_broker_name(
194                            info.scope.as_ref().unwrap().as_str(),
195                        ),
196                        *global_id,
197                    );
198                    if let Some(old_info) = mq_endpoints.get(&mq) {
199                        if old_info.epoch <= info.epoch {
200                            mq_endpoints.insert(mq, info.clone());
201                        }
202                    } else {
203                        mq_endpoints.insert(mq, info.clone());
204                    }
205                }
206            }
207
208            for i in 0..max_total_nums {
209                let mq = MessageQueue::from_parts(
210                    topic,
211                    TopicQueueMappingUtils::get_mock_broker_name(&scope),
212                    i,
213                );
214                let broker_name = mq_endpoints
215                    .get(&mq)
216                    .map(|info| info.bname.clone().unwrap())
217                    .unwrap_or_else(|| {
218                        CheetahString::from_static_str(
219                            mix_all::LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST,
220                        )
221                    });
222                mq_end_points_of_broker.insert(mq, broker_name);
223            }
224        }
225
226        Some(mq_end_points_of_broker)
227    }
228
229    pub fn broker_addr_table(
230        &self,
231    ) -> Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>> {
232        self.broker_addr_table.clone()
233    }
234}