Skip to main content

rocketmq_client_rust/factory/
mq_client_instance.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::HashSet;
18use std::sync::atomic::AtomicI64;
19use std::sync::Arc;
20use std::time::Duration;
21
22use cheetah_string::CheetahString;
23use dashmap::DashMap;
24use futures::future;
25use rand::seq::IndexedRandom;
26use rocketmq_common::common::base::service_state::ServiceState;
27use rocketmq_common::common::boundary_type::BoundaryType;
28use rocketmq_common::common::config::TopicConfig;
29use rocketmq_common::common::constant::PermName;
30use rocketmq_common::common::filter::expression_type::ExpressionType;
31use rocketmq_common::common::message::message_decoder;
32use rocketmq_common::common::message::message_ext::MessageExt;
33use rocketmq_common::common::message::message_queue::MessageQueue;
34use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
35use rocketmq_common::common::mix_all;
36use rocketmq_common::TimeUtils::current_millis;
37use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
38use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
39use rocketmq_remoting::protocol::header::get_topic_config_request_header::GetTopicConfigRequestHeader;
40use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
41use rocketmq_remoting::protocol::header::query_consumer_offset_request_header::QueryConsumerOffsetRequestHeader;
42use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader;
43use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
44use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
45use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
46use rocketmq_remoting::protocol::heartbeat::producer_data::ProducerData;
47use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
48use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
49use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
50use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
51use rocketmq_remoting::runtime::RPCHook;
52use rocketmq_rust::schedule::simple_scheduler::ScheduledTaskManager;
53use rocketmq_rust::ArcMut;
54use rocketmq_rust::RocketMQTokioMutex;
55use tracing::error;
56use tracing::info;
57use tracing::warn;
58
59use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
60use crate::base::client_config::ClientConfig;
61use crate::consumer::consumer_impl::pull_message_service::PullMessageService;
62use crate::consumer::consumer_impl::pull_request_ext::PullResultExt;
63use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceService;
64use crate::consumer::mq_consumer_inner::MQConsumerInner;
65use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
66use crate::consumer::pull_callback::PullCallback;
67use crate::consumer::pull_result::PullResult;
68use crate::consumer::pull_status::PullStatus;
69use crate::factory::client_tables::*;
70use crate::implementation::client_remoting_processor::ClientRemotingProcessor;
71use crate::implementation::communication_mode::CommunicationMode;
72use crate::implementation::find_broker_result::FindBrokerResult;
73use crate::implementation::mq_admin_impl::MQAdminImpl;
74use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
75use crate::producer::default_mq_producer::DefaultMQProducer;
76use crate::producer::default_mq_producer::ProducerConfig;
77use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
78use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
79use crate::stat::consumer_stats_manager::ConsumerStatsManager;
80
81const LOCK_TIMEOUT_MILLIS: u64 = 3000;
82
83pub struct MQClientInstance {
84    pub(crate) client_config: ArcMut<ClientConfig>,
85    pub(crate) client_id: CheetahString,
86    boot_timestamp: u64,
87    /**
88     * The container of the producer in the current client. The key is the name of
89     * producerGroup.
90     */
91    producer_table: ProducerTable,
92    /**
93     * The container of the consumer in the current client. The key is the name of
94     * consumer_group.
95     */
96    consumer_table: ConsumerTable,
97    /**
98     * The container of the adminExt in the current client. The key is the name of
99     * adminExtGroup.
100     */
101    admin_ext_table: AdminExtTable,
102    pub(crate) mq_client_api_impl: Option<ArcMut<MQClientAPIImpl>>,
103    pub(crate) mq_admin_impl: ArcMut<MQAdminImpl>,
104    pub(crate) topic_route_table: TopicRouteTable,
105    topic_end_points_table: TopicEndPointsTable,
106    lock_namesrv: Arc<RocketMQTokioMutex<()>>,
107    lock_heartbeat: Arc<RocketMQTokioMutex<()>>,
108
109    service_state: ServiceState,
110    pub(crate) pull_message_service: ArcMut<PullMessageService>,
111    rebalance_service: RebalanceService,
112    pub(crate) default_producer: ArcMut<DefaultMQProducer>,
113    broker_addr_table: BrokerAddrTable,
114    broker_version_table: BrokerVersionTable,
115    send_heartbeat_times_total: Arc<AtomicI64>,
116    scheduled_task_manager: ScheduledTaskManager,
117    /// HeartbeatV2: Cache of broker address -> last fingerprint
118    broker_heartbeat_fingerprint_table: BrokerHeartbeatFingerprintTable,
119    /// HeartbeatV2: Set of brokers that support V2 protocol
120    broker_support_v2_heartbeat_set: BrokerSupportV2HeartbeatSet,
121    consumer_stats_manager: ConsumerStatsManager,
122}
123
124impl MQClientInstance {
125    pub fn new_arc(
126        client_config: ClientConfig,
127        _instance_index: i32,
128        client_id: impl Into<CheetahString>,
129        rpc_hook: Option<Arc<dyn RPCHook>>,
130    ) -> ArcMut<MQClientInstance> {
131        let client_id = client_id.into();
132        let shared_config = ArcMut::new(client_config.clone());
133
134        let broker_addr_table = Arc::new(DashMap::default());
135        let producer_table = Arc::new(DashMap::default());
136        let consumer_table = Arc::new(DashMap::default());
137        let admin_ext_table = Arc::new(DashMap::default());
138        let topic_route_table = Arc::new(DashMap::default());
139        let topic_end_points_table = Arc::new(DashMap::default());
140        let broker_version_table = Arc::new(DashMap::default());
141        let broker_heartbeat_fingerprint_table = Arc::new(DashMap::default());
142        let broker_support_v2_heartbeat_set = Arc::new(DashMap::default());
143
144        let default_producer = ArcMut::new(
145            DefaultMQProducer::builder()
146                .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
147                .client_config(client_config.clone())
148                .build(),
149        );
150        let mut instance = ArcMut::new(MQClientInstance {
151            client_config: shared_config,
152            client_id,
153            boot_timestamp: current_millis(),
154            producer_table,
155            consumer_table,
156            admin_ext_table,
157            mq_client_api_impl: None,
158            mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
159            topic_route_table,
160            topic_end_points_table,
161            lock_namesrv: Arc::default(),
162            lock_heartbeat: Arc::default(),
163            service_state: ServiceState::CreateJust,
164            pull_message_service: ArcMut::new(PullMessageService::new()),
165            rebalance_service: RebalanceService::new(),
166            default_producer,
167            broker_addr_table,
168            broker_version_table,
169            send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
170            scheduled_task_manager: ScheduledTaskManager::new(),
171            broker_heartbeat_fingerprint_table,
172            broker_support_v2_heartbeat_set,
173            consumer_stats_manager: ConsumerStatsManager::new(),
174        });
175
176        // Clone instance first to avoid borrow checker issues
177        let instance_clone = instance.clone();
178        instance.mq_admin_impl.set_client(instance_clone);
179
180        let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
181
182        let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
183            Arc::new(TokioClientConfig::default()),
184            ClientRemotingProcessor::new(instance.clone()),
185            rpc_hook,
186            Arc::new(client_config.clone()),
187            Some(tx),
188        ));
189
190        if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
191            let api_impl = mq_client_api_impl.clone();
192            let addr = namesrv_addr.to_string();
193            // Use block_in_place for synchronous initialization in async context
194            tokio::task::block_in_place(|| {
195                tokio::runtime::Handle::current().block_on(async move {
196                    api_impl.update_name_server_address_list(&addr).await;
197                })
198            });
199        }
200
201        instance.mq_client_api_impl = Some(mq_client_api_impl);
202
203        // Use weak reference to avoid circular dependencies
204        let weak_instance = ArcMut::downgrade(&instance);
205        tokio::spawn(async move {
206            while let Ok(value) = rx.recv().await {
207                if let Some(instance_) = weak_instance.upgrade() {
208                    match value {
209                        ConnectionNetEvent::CONNECTED(remote_address) => {
210                            info!("ConnectionNetEvent CONNECTED");
211
212                            let matched_brokers: Vec<_> = instance_
213                                .broker_addr_table
214                                .iter()
215                                .flat_map(|entry| {
216                                    let (name, addrs) = entry.pair();
217                                    addrs
218                                        .iter()
219                                        .filter(|(_, addr)| addr.as_str() == remote_address.to_string().as_str())
220                                        .map(|(id, addr)| (*id, name.clone(), addr.clone()))
221                                        .collect::<Vec<_>>()
222                                })
223                                .collect();
224
225                            for (id, broker_name, addr) in matched_brokers {
226                                if instance_.send_heartbeat_to_broker(id, &broker_name, &addr, false).await {
227                                    instance_.re_balance_immediately();
228                                }
229                            }
230                        }
231                        ConnectionNetEvent::DISCONNECTED => {}
232                        ConnectionNetEvent::EXCEPTION => {}
233                        ConnectionNetEvent::IDLE => {}
234                    }
235                }
236            }
237            warn!("ConnectionNetEvent recv error");
238        });
239        instance
240    }
241
242    /// Returns a reference to the [`ConsumerStatsManager`] held by this instance.
243    pub fn consumer_stats_manager(&self) -> &ConsumerStatsManager {
244        &self.consumer_stats_manager
245    }
246
247    pub fn re_balance_immediately(&self) {
248        self.rebalance_service.wakeup();
249    }
250
251    pub fn re_balance_later(&self, delay_millis: Duration) {
252        if delay_millis <= Duration::from_millis(0) {
253            self.rebalance_service.wakeup();
254        } else {
255            let service = self.rebalance_service.clone();
256            tokio::spawn(async move {
257                tokio::time::sleep(delay_millis).await;
258                service.wakeup();
259            });
260        }
261    }
262
263    pub async fn start(&mut self, this: ArcMut<Self>) -> rocketmq_error::RocketMQResult<()> {
264        match self.service_state {
265            ServiceState::CreateJust => {
266                self.service_state = ServiceState::StartFailed;
267                // If not specified,looking address from name remoting_server
268                if self.client_config.namesrv_addr.is_none() {
269                    self.mq_client_api_impl
270                        .as_mut()
271                        .expect("mq_client_api_impl is None")
272                        .fetch_name_server_addr()
273                        .await;
274                }
275                // Start request-response channel
276                self.mq_client_api_impl
277                    .as_mut()
278                    .expect("mq_client_api_impl is None")
279                    .start()
280                    .await;
281                // Start various schedule tasks
282                self.start_scheduled_task(this.clone());
283                // Start pull service
284                let instance = this.clone();
285                if let Err(e) = self.pull_message_service.start(instance).await {
286                    error!("Failed to start pull message service: {:?}", e);
287                }
288                // Start rebalance service
289                if let Err(e) = self.rebalance_service.start(this).await {
290                    error!("Failed to start rebalance service: {:?}", e);
291                }
292                // Start push service
293
294                self.default_producer
295                    .default_mqproducer_impl
296                    .as_mut()
297                    .unwrap()
298                    .start_with_factory(false)
299                    .await?;
300                // Start consumer stats manager
301                self.consumer_stats_manager.start();
302                info!("the client factory[{}] start OK", self.client_id);
303                self.service_state = ServiceState::Running;
304            }
305            ServiceState::Running => {}
306            ServiceState::ShutdownAlready => {}
307            ServiceState::StartFailed => {
308                return Err(mq_client_err!(format!(
309                    "The Factory object[{}] has been created before, and failed.",
310                    self.client_id
311                )));
312            }
313        }
314        Ok(())
315    }
316
317    pub async fn shutdown(&mut self) {
318        match self.service_state {
319            ServiceState::CreateJust | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
320                warn!(
321                    "MQClientInstance shutdown called but state is {:?}, ignoring shutdown request",
322                    self.service_state
323                );
324                return;
325            }
326            ServiceState::Running => {
327                info!(
328                    "MQClientInstance[{}] shutdown starting, current state: Running",
329                    self.client_id
330                );
331            }
332        }
333
334        info!("MQClientInstance[{}] shutting down rebalance service", self.client_id);
335        if let Err(e) = self.rebalance_service.shutdown(3000).await {
336            warn!("Failed to shutdown rebalance service: {:?}", e);
337        }
338
339        info!(
340            "MQClientInstance[{}] shutting down pull message service",
341            self.client_id
342        );
343        if let Err(e) = self.pull_message_service.shutdown_default().await {
344            warn!("Failed to shutdown pull message service: {:?}", e);
345        }
346
347        info!("MQClientInstance[{}] persisting all consumer offsets", self.client_id);
348        self.persist_all_consumer_offset().await;
349
350        info!("MQClientInstance[{}] shutting down default producer", self.client_id);
351        if let Some(producer_impl) = self.default_producer.default_mqproducer_impl.as_mut() {
352            let shutdown_future = producer_impl.shutdown_with_factory(false);
353            if let Err(e) = Box::pin(shutdown_future).await {
354                warn!("Failed to shutdown default producer: {:?}", e);
355            }
356        }
357
358        info!("MQClientInstance[{}] unregistering all consumers", self.client_id);
359        self.unregister_all_consumers().await;
360
361        info!("MQClientInstance[{}] unregistering all producers", self.client_id);
362        self.unregister_all_producers().await;
363
364        if self.mq_client_api_impl.is_some() {
365            info!(
366                "MQClientInstance[{}] network client shutdown (deferred to Drop)",
367                self.client_id
368            );
369        }
370
371        info!("MQClientInstance[{}] canceling scheduled tasks", self.client_id);
372        self.scheduled_task_manager.cancel_all();
373        info!(
374            "MQClientInstance[{}] scheduled tasks canceled, total canceled: {}",
375            self.client_id,
376            self.scheduled_task_manager.task_count()
377        );
378
379        info!("MQClientInstance[{}] clearing all registration tables", self.client_id);
380        self.producer_table.clear();
381        self.consumer_table.clear();
382        self.admin_ext_table.clear();
383
384        self.topic_route_table.clear();
385        self.topic_end_points_table.clear();
386        self.broker_addr_table.clear();
387        self.broker_version_table.clear();
388
389        self.service_state = ServiceState::ShutdownAlready;
390
391        info!("MQClientInstance[{}] shutdown completed successfully", self.client_id);
392    }
393
394    /// Unregister all producers from broker
395    async fn unregister_all_producers(&mut self) {
396        // Get all producer groups before removing from table
397        let producer_groups: Vec<CheetahString> = self.producer_table.iter().map(|entry| entry.key().clone()).collect();
398
399        // Unregister each producer (removes from table and notifies broker)
400        for group in producer_groups {
401            info!("Unregistering producer group: {}", group);
402            self.unregister_producer(group).await;
403        }
404    }
405
406    /// Unregister all consumers from broker
407    async fn unregister_all_consumers(&mut self) {
408        // Get all consumer groups before removing from table
409        let consumer_groups: Vec<CheetahString> = self.consumer_table.iter().map(|entry| entry.key().clone()).collect();
410
411        // Unregister each consumer (removes from table and notifies broker)
412        for group in consumer_groups {
413            info!("Unregistering consumer group: {}", group);
414            self.unregister_consumer(group).await;
415        }
416    }
417
418    pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
419        if group.is_empty() {
420            return false;
421        }
422        if self.producer_table.contains_key(group) {
423            warn!("the producer group[{}] exist already.", group);
424            return false;
425        }
426        self.producer_table.insert(group.into(), producer);
427        true
428    }
429
430    pub async fn register_admin_ext(&mut self, group: &str, admin: MQAdminExtInnerImpl) -> bool {
431        if group.is_empty() {
432            return false;
433        }
434        if self.admin_ext_table.contains_key(group) {
435            warn!("the admin group[{}] exist already.", group);
436            return false;
437        }
438        self.admin_ext_table.insert(group.into(), admin);
439        true
440    }
441
442    fn start_scheduled_task(&mut self, this: ArcMut<Self>) {
443        info!("Starting scheduled tasks with ScheduledTaskManager");
444
445        if self.client_config.namesrv_addr.is_none() {
446            let mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
447            self.scheduled_task_manager.add_fixed_rate_task_async(
448                Duration::from_secs(10),
449                Duration::from_secs(120),
450                async move |_token| {
451                    let mut api = mq_client_api_impl.clone();
452                    info!("ScheduledTask: fetchNameServerAddr");
453                    api.fetch_name_server_addr().await;
454                    Ok(())
455                },
456            );
457        }
458
459        let client_instance = this.clone();
460        let poll_name_server_interval = self.client_config.poll_name_server_interval;
461        self.scheduled_task_manager.add_fixed_rate_task_async(
462            Duration::from_millis(10),
463            Duration::from_millis(poll_name_server_interval as u64),
464            async move |_token| {
465                let mut instance = client_instance.clone();
466                info!("ScheduledTask: update_topic_route_info_from_name_server");
467                instance.update_topic_route_info_from_name_server().await;
468                Ok(())
469            },
470        );
471
472        let client_instance = this.clone();
473        let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
474        self.scheduled_task_manager.add_fixed_rate_task_async(
475            Duration::from_secs(1),
476            Duration::from_millis(heartbeat_broker_interval as u64),
477            async move |_token| {
478                let mut instance = client_instance.clone();
479                info!("ScheduledTask: clean_offline_broker and send_heartbeat");
480                instance.clean_offline_broker().await;
481                instance.send_heartbeat_to_all_broker_with_lock().await;
482                Ok(())
483            },
484        );
485
486        let client_instance = this;
487        let persist_consumer_offset_interval = self.client_config.persist_consumer_offset_interval as u64;
488        self.scheduled_task_manager.add_fixed_rate_task_async(
489            Duration::from_secs(10),
490            Duration::from_millis(persist_consumer_offset_interval),
491            async move |_token| {
492                let mut instance = client_instance.clone();
493                info!("ScheduledTask: persistAllConsumerOffset");
494                instance.persist_all_consumer_offset().await;
495                Ok(())
496            },
497        );
498
499        info!(
500            "All scheduled tasks started, total tasks: {}",
501            self.scheduled_task_manager.task_count()
502        );
503    }
504
505    pub async fn update_topic_route_info_from_name_server(&mut self) {
506        let mut topic_list = HashSet::new();
507
508        for entry in self.producer_table.iter() {
509            topic_list.extend(entry.value().get_publish_topic_list());
510        }
511
512        for entry in self.consumer_table.iter() {
513            entry.value().subscriptions().iter().for_each(|sub| {
514                topic_list.insert(sub.topic.clone());
515            });
516        }
517
518        for topic in topic_list.iter() {
519            self.update_topic_route_info_from_name_server_topic(topic).await;
520        }
521    }
522
523    #[inline]
524    pub async fn update_topic_route_info_from_name_server_topic(&mut self, topic: &CheetahString) -> bool {
525        self.update_topic_route_info_from_name_server_default(topic, false, None)
526            .await
527    }
528
529    pub async fn find_consumer_id_list(
530        &mut self,
531        topic: &CheetahString,
532        group: &CheetahString,
533    ) -> Option<Vec<CheetahString>> {
534        let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
535        if broker_addr.is_none() {
536            self.update_topic_route_info_from_name_server_topic(topic).await;
537            broker_addr = self.find_broker_addr_by_topic(topic).await;
538        }
539        if let Some(broker_addr) = broker_addr {
540            match self
541                .mq_client_api_impl
542                .as_mut()
543                .unwrap()
544                .get_consumer_id_list_by_group(broker_addr.as_str(), group, self.client_config.mq_client_api_timeout)
545                .await
546            {
547                Ok(value) => return Some(value),
548                Err(e) => {
549                    warn!(
550                        "getConsumerIdListByGroup exception,{}  {}, err:{}",
551                        broker_addr,
552                        group,
553                        e.to_string()
554                    );
555                }
556            }
557        }
558        None
559    }
560
561    pub async fn find_broker_addr_by_topic(&self, topic: &str) -> Option<CheetahString> {
562        if let Some(topic_route_data) = self.topic_route_table.get(topic) {
563            let brokers = &topic_route_data.value().broker_datas;
564            if !brokers.is_empty() {
565                let bd = brokers.choose(&mut rand::rng());
566                if let Some(bd) = bd {
567                    return bd.select_broker_addr();
568                }
569            }
570        }
571        None
572    }
573
574    pub async fn update_topic_route_info_from_name_server_default(
575        &mut self,
576        topic: &CheetahString,
577        is_default: bool,
578        producer_config: Option<&Arc<ProducerConfig>>,
579    ) -> bool {
580        let lock = self.lock_namesrv.lock().await;
581        let topic_route_data = if let (true, Some(producer_config)) = (is_default, producer_config) {
582            let mut result = match self
583                .mq_client_api_impl
584                .as_mut()
585                .unwrap()
586                .get_default_topic_route_info_from_name_server(self.client_config.mq_client_api_timeout)
587                .await
588            {
589                Ok(value) => value,
590                Err(e) => {
591                    error!(
592                        "get_default_topic_route_info_from_name_server failed, topic: {}, error: {}",
593                        topic, e
594                    );
595                    None
596                }
597            };
598            if let Some(topic_route_data) = result.as_mut() {
599                for data in topic_route_data.queue_datas.iter_mut() {
600                    let queue_nums = producer_config.default_topic_queue_nums().max(data.read_queue_nums);
601                    data.read_queue_nums = queue_nums;
602                    data.write_queue_nums = queue_nums;
603                }
604            }
605            result
606        } else {
607            self.mq_client_api_impl
608                .as_mut()
609                .unwrap()
610                .get_topic_route_info_from_name_server(topic, self.client_config.mq_client_api_timeout)
611                .await
612                .unwrap_or(None)
613        };
614        if let Some(mut topic_route_data) = topic_route_data {
615            let old = self.topic_route_table.get(topic).map(|entry| entry.value().clone());
616            let mut changed = topic_route_data.topic_route_data_changed(old.as_ref());
617            if !changed {
618                changed = self.is_need_update_topic_route_info(topic).await;
619            } else {
620                info!(
621                    "the topic[{}] route info changed, old[{:?}] ,new[{:?}]",
622                    topic, old, topic_route_data
623                )
624            }
625            if changed {
626                for bd in topic_route_data.broker_datas.iter() {
627                    self.broker_addr_table
628                        .insert(bd.broker_name().clone(), bd.broker_addrs().clone());
629                }
630
631                // Update endpoint map
632                {
633                    let mq_end_points =
634                        ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, &topic_route_data);
635                    if let Some(mq_end_points) = mq_end_points {
636                        if !mq_end_points.is_empty() {
637                            self.topic_end_points_table.insert(topic.into(), mq_end_points);
638                        }
639                    }
640                }
641
642                // Update Pub info
643                {
644                    let mut publish_info = topic_route_data2topic_publish_info(topic, &mut topic_route_data);
645                    publish_info.have_topic_router_info = true;
646                    for mut entry in self.producer_table.iter_mut() {
647                        entry
648                            .value_mut()
649                            .update_topic_publish_info(topic.to_string(), Some(publish_info.clone()));
650                    }
651                }
652
653                // Update sub info
654                if !self.consumer_table.is_empty() {
655                    let subscribe_info = topic_route_data2topic_subscribe_info(topic, &topic_route_data);
656
657                    let consumers: Vec<_> = self.consumer_table.iter().map(|entry| entry.value().clone()).collect();
658
659                    for consumer in consumers {
660                        consumer
661                            .update_topic_subscribe_info(topic.clone(), &subscribe_info)
662                            .await;
663                    }
664                }
665                let clone_topic_route_data = TopicRouteData::from_existing(&topic_route_data);
666                self.topic_route_table.insert(topic.clone(), clone_topic_route_data);
667                return true;
668            }
669        } else {
670            warn!(
671                "updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]",
672                topic, self.client_id
673            );
674        }
675
676        drop(lock);
677        false
678    }
679
680    async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool {
681        for entry in self.producer_table.iter() {
682            if entry.value().is_publish_topic_need_update(topic) {
683                return true;
684            }
685        }
686
687        for entry in self.consumer_table.iter() {
688            if entry.value().is_subscribe_topic_need_update(topic).await {
689                return true;
690            }
691        }
692        false
693    }
694
695    pub async fn persist_all_consumer_offset(&mut self) {
696        for entry in self.consumer_table.iter() {
697            entry.value().persist_consumer_offset().await;
698        }
699    }
700
701    pub async fn clean_offline_broker(&mut self) {
702        let lock = self
703            .lock_namesrv
704            .try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
705            .await;
706        if let Some(lock) = lock {
707            let mut updated_table = HashMap::new();
708            let mut broker_name_set = HashSet::new();
709
710            for broker_entry in self.broker_addr_table.iter() {
711                let (broker_name, one_table) = broker_entry.pair();
712                let mut clone_addr_table = one_table.clone();
713                let mut remove_id_set = HashSet::new();
714                for (id, addr) in one_table.iter() {
715                    if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
716                        remove_id_set.insert(*id);
717                    }
718                }
719                clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
720                if clone_addr_table.is_empty() {
721                    info!("the broker[{}] name's host is offline, remove it", broker_name);
722                    broker_name_set.insert(broker_name.clone());
723                } else {
724                    updated_table.insert(broker_name.clone(), clone_addr_table);
725                }
726            }
727
728            // Remove offline brokers and update with new data
729            for broker_name in broker_name_set {
730                self.broker_addr_table.remove(&broker_name);
731            }
732            for (broker_name, addrs) in updated_table {
733                self.broker_addr_table.insert(broker_name, addrs);
734            }
735        }
736    }
737    pub async fn send_heartbeat_to_all_broker_with_lock(&self) -> bool {
738        let _guard = match self.lock_heartbeat.try_lock().await {
739            Some(g) => g,
740            None => {
741                warn!("lock heartBeat, but failed. [{}]", self.client_id);
742                return false;
743            }
744        };
745
746        // Check if concurrent heartbeat is enabled
747        if self.client_config.enable_concurrent_heartbeat {
748            return self.send_heartbeat_to_all_broker_concurrently().await;
749        }
750
751        // Use V2 or V1 protocol based on configuration
752        if self.client_config.use_heartbeat_v2 {
753            self.send_heartbeat_to_all_broker_v2(false).await
754        } else {
755            self.send_heartbeat_to_all_broker().await
756        }
757    }
758
759    pub async fn send_heartbeat_to_all_broker_with_lock_v2(&self, is_rebalance: bool) -> bool {
760        let _guard = match self.lock_heartbeat.try_lock_timeout(Duration::from_secs(2)).await {
761            Some(g) => g,
762            None => {
763                warn!("lock heartBeat, but failed. [{}]", self.client_id);
764                return false;
765            }
766        };
767
768        if self.client_config.use_heartbeat_v2 {
769            self.send_heartbeat_to_all_broker_v2(is_rebalance).await
770        } else {
771            self.send_heartbeat_to_all_broker().await
772        }
773    }
774
775    pub fn get_mq_client_api_impl(&self) -> ArcMut<MQClientAPIImpl> {
776        self.mq_client_api_impl.as_ref().unwrap().clone()
777    }
778
779    pub async fn pull_message_from_broker(
780        &self,
781        broker_addr: &str,
782        request_header: PullMessageRequestHeader,
783        timeout_millis: u64,
784    ) -> rocketmq_error::RocketMQResult<PullResult> {
785        struct NoopPullCallback;
786
787        impl PullCallback for NoopPullCallback {
788            async fn on_success(&mut self, _pull_result: PullResultExt) {}
789
790            fn on_exception(&mut self, _e: Box<dyn std::error::Error + Send>) {}
791        }
792
793        let api_impl = self
794            .mq_client_api_impl
795            .as_ref()
796            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
797
798        let mut result = MQClientAPIImpl::pull_message(
799            api_impl.clone(),
800            CheetahString::from(broker_addr),
801            request_header,
802            timeout_millis,
803            CommunicationMode::Sync,
804            NoopPullCallback,
805        )
806        .await?
807        .ok_or_else(|| rocketmq_error::RocketMQError::Internal("pull_message returned None in sync mode".into()))?;
808
809        if result.pull_result.pull_status == PullStatus::Found {
810            if let Some(mut message_binary) = result.message_binary.take() {
811                let messages = message_decoder::decodes_batch(&mut message_binary, true, true);
812                result
813                    .pull_result
814                    .set_msg_found_list(Some(messages.into_iter().map(ArcMut::new).collect()));
815            }
816        }
817
818        Ok(result.pull_result)
819    }
820
821    pub async fn query_consumer_offset(
822        &self,
823        broker_addr: &str,
824        request_header: QueryConsumerOffsetRequestHeader,
825        timeout_millis: u64,
826    ) -> rocketmq_error::RocketMQResult<i64> {
827        let api_impl = self
828            .mq_client_api_impl
829            .as_ref()
830            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
831        api_impl
832            .clone()
833            .query_consumer_offset(broker_addr, request_header, timeout_millis)
834            .await
835    }
836
837    pub async fn update_consumer_offset(
838        &self,
839        broker_addr: &CheetahString,
840        request_header: UpdateConsumerOffsetRequestHeader,
841        timeout_millis: u64,
842    ) -> rocketmq_error::RocketMQResult<()> {
843        let api_impl = self
844            .mq_client_api_impl
845            .as_ref()
846            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
847        api_impl
848            .clone()
849            .update_consumer_offset(broker_addr, request_header, timeout_millis)
850            .await
851    }
852
853    pub async fn consumer_send_message_back(
854        &mut self,
855        broker_addr: &str,
856        broker_name: &str,
857        message: &MessageExt,
858        consumer_group: &str,
859        delay_level: i32,
860        timeout_millis: u64,
861        max_consume_retry_times: i32,
862    ) -> rocketmq_error::RocketMQResult<()> {
863        let api_impl = self
864            .mq_client_api_impl
865            .as_ref()
866            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
867        api_impl
868            .clone()
869            .consumer_send_message_back(
870                broker_addr,
871                broker_name,
872                message,
873                consumer_group,
874                delay_level,
875                timeout_millis,
876                max_consume_retry_times,
877            )
878            .await
879    }
880
881    pub async fn get_max_offset(
882        &self,
883        broker_addr: &str,
884        message_queue: &MessageQueue,
885        timeout_millis: u64,
886    ) -> rocketmq_error::RocketMQResult<i64> {
887        let api_impl = self
888            .mq_client_api_impl
889            .as_ref()
890            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
891        api_impl
892            .clone()
893            .get_max_offset(broker_addr, message_queue, timeout_millis)
894            .await
895    }
896
897    pub async fn get_min_offset(
898        &self,
899        broker_addr: &str,
900        message_queue: &MessageQueue,
901        timeout_millis: u64,
902    ) -> rocketmq_error::RocketMQResult<i64> {
903        let api_impl = self
904            .mq_client_api_impl
905            .as_ref()
906            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
907        api_impl
908            .clone()
909            .get_min_offset(broker_addr, message_queue, timeout_millis)
910            .await
911    }
912
913    pub async fn search_offset_by_timestamp(
914        &self,
915        broker_addr: &str,
916        message_queue: &MessageQueue,
917        timestamp: i64,
918        boundary_type: BoundaryType,
919        timeout_millis: u64,
920    ) -> rocketmq_error::RocketMQResult<i64> {
921        let api_impl = self
922            .mq_client_api_impl
923            .as_ref()
924            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
925        api_impl
926            .clone()
927            .search_offset_by_timestamp(broker_addr, message_queue, timestamp, boundary_type, timeout_millis)
928            .await
929    }
930
931    pub async fn get_topic_config(
932        &self,
933        broker_addr: &CheetahString,
934        topic: CheetahString,
935        timeout_millis: u64,
936    ) -> rocketmq_error::RocketMQResult<TopicConfig> {
937        let request_header = GetTopicConfigRequestHeader {
938            topic,
939            topic_request_header: None,
940        };
941        let api_impl = self
942            .mq_client_api_impl
943            .as_ref()
944            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
945        let topic_mapping = api_impl
946            .get_topic_config(broker_addr, request_header, timeout_millis)
947            .await?;
948        Ok(topic_mapping.topic_config)
949    }
950
951    pub async fn get_subscription_group_config(
952        &self,
953        broker_addr: &CheetahString,
954        group: CheetahString,
955        timeout_millis: u64,
956    ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
957        let api_impl = self
958            .mq_client_api_impl
959            .as_ref()
960            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
961        api_impl
962            .get_subscription_group_config(broker_addr, group, timeout_millis)
963            .await
964    }
965
966    pub async fn get_broker_name_from_message_queue(&self, message_queue: &MessageQueue) -> CheetahString {
967        if let Some(broker_name) = self.topic_end_points_table.get(message_queue.topic_str()) {
968            if let Some(addr) = broker_name.value().get(message_queue) {
969                return addr.clone();
970            }
971        }
972        message_queue.broker_name().clone()
973    }
974
975    pub fn find_broker_address_in_publish(&self, broker_name: &CheetahString) -> Option<CheetahString> {
976        if broker_name.is_empty() {
977            return None;
978        }
979        let map = self.broker_addr_table.get(broker_name);
980        if let Some(map) = map {
981            return map.get(&(mix_all::MASTER_ID)).cloned();
982        }
983        None
984    }
985
986    async fn send_heartbeat_to_all_broker_v2(&self, is_rebalance: bool) -> bool {
987        let heartbeat_data_with_sub = self.prepare_heartbeat_data(false).await;
988        let producer_empty = heartbeat_data_with_sub.producer_data_set.is_empty();
989        let consumer_empty = heartbeat_data_with_sub.consumer_data_set.is_empty();
990
991        if producer_empty && consumer_empty {
992            warn!(
993                "sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]",
994                self.client_id
995            );
996            return false;
997        }
998
999        if self.broker_addr_table.is_empty() {
1000            return false;
1001        }
1002
1003        // Reset fingerprint map on rebalance
1004        if is_rebalance {
1005            self.reset_broker_addr_heartbeat_fingerprint_map().await;
1006        }
1007
1008        // Compute fingerprint for current heartbeat
1009        let current_fingerprint = heartbeat_data_with_sub.compute_heartbeat_fingerprint();
1010
1011        // Send to all brokers
1012        for broker_entry in self.broker_addr_table.iter() {
1013            let (broker_name, broker_addrs) = broker_entry.pair();
1014            if broker_addrs.is_empty() {
1015                continue;
1016            }
1017
1018            for (id, addr) in broker_addrs.iter() {
1019                if addr.is_empty() {
1020                    continue;
1021                }
1022                // Skip non-master brokers for consumer-only clients
1023                if consumer_empty && *id != mix_all::MASTER_ID {
1024                    continue;
1025                }
1026
1027                // Clone heartbeat data for this broker
1028                let mut heartbeat_data = heartbeat_data_with_sub.clone();
1029                heartbeat_data.heartbeat_fingerprint = current_fingerprint;
1030
1031                self.send_heartbeat_to_broker_v2(*id, broker_name, addr, heartbeat_data)
1032                    .await;
1033            }
1034        }
1035
1036        true
1037    }
1038
1039    async fn reset_broker_addr_heartbeat_fingerprint_map(&self) {
1040        self.broker_heartbeat_fingerprint_table.clear();
1041    }
1042
1043    /// Send heartbeat to all brokers concurrently
1044    async fn send_heartbeat_to_all_broker_concurrently(&self) -> bool {
1045        let heartbeat_data = self.prepare_heartbeat_data(false).await;
1046        let producer_empty = heartbeat_data.producer_data_set.is_empty();
1047        let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
1048
1049        if producer_empty && consumer_empty {
1050            warn!(
1051                "sending heartbeat concurrently, but no consumer and no producer. [{}]",
1052                self.client_id
1053            );
1054            return false;
1055        }
1056
1057        // Collect broker list without holding lock during task execution
1058        let broker_list: Vec<(u64, CheetahString, CheetahString)> = {
1059            if self.broker_addr_table.is_empty() {
1060                return false;
1061            }
1062
1063            let mut list = Vec::new();
1064            for broker_entry in self.broker_addr_table.iter() {
1065                let (broker_name, broker_addrs) = broker_entry.pair();
1066                if broker_addrs.is_empty() {
1067                    continue;
1068                }
1069
1070                for (id, addr) in broker_addrs.iter() {
1071                    if addr.is_empty() {
1072                        continue;
1073                    }
1074                    // Skip non-master brokers for consumer-only clients
1075                    if consumer_empty && *id != mix_all::MASTER_ID {
1076                        continue;
1077                    }
1078                    list.push((*id, broker_name.clone(), addr.clone()));
1079                }
1080            }
1081            list
1082        };
1083
1084        if broker_list.is_empty() {
1085            return false;
1086        }
1087
1088        let task_count = broker_list.len();
1089
1090        // Collect all heartbeat tasks
1091        let mut tasks = Vec::new();
1092        for (broker_id, broker_name, addr) in broker_list {
1093            // Clone necessary data for the async task
1094            let heartbeat_data = heartbeat_data.clone();
1095            let client_id = self.client_id.clone();
1096            let mq_client_api = self.mq_client_api_impl.as_ref().unwrap().clone();
1097            let timeout = self.client_config.mq_client_api_timeout;
1098            let send_heartbeat_times_total = self.send_heartbeat_times_total.clone();
1099            let topic_route_table_clone = self.topic_route_table.clone();
1100
1101            // Spawn independent task for each broker
1102            // Returns: (broker_name, addr, version, success)
1103            let task = tokio::spawn(async move {
1104                match mq_client_api
1105                    .mut_from_ref()
1106                    .send_heartbeat(&addr, &heartbeat_data, timeout)
1107                    .await
1108                {
1109                    Ok((version, _response)) => {
1110                        let times = send_heartbeat_times_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1111                        if times % 20 == 0 {
1112                            info!(
1113                                "send heart beat to broker[{} {} {}] success (concurrent)",
1114                                broker_name, broker_id, addr
1115                            );
1116                        }
1117                        (broker_name, addr, Some(version), true)
1118                    }
1119                    Err(_) => {
1120                        // Check if broker is in name server
1121                        let is_in_ns = topic_route_table_clone.iter().any(|route_entry| {
1122                            route_entry.value().broker_datas.iter().any(|bd| {
1123                                bd.broker_addrs()
1124                                    .iter()
1125                                    .any(|(_, broker_addr)| broker_addr.as_str() == addr.as_str())
1126                            })
1127                        });
1128
1129                        if is_in_ns {
1130                            warn!(
1131                                "send heart beat to broker[{} {} {}] failed (concurrent)",
1132                                broker_name, broker_id, addr
1133                            );
1134                        } else {
1135                            warn!(
1136                                "send heart beat to broker[{} {} {}] exception, because the broker not up, forget it \
1137                                 (concurrent)",
1138                                broker_name, broker_id, addr
1139                            );
1140                        }
1141                        (broker_name, addr, None, false)
1142                    }
1143                }
1144            });
1145
1146            tasks.push(task);
1147        }
1148
1149        // Wait for all tasks with timeout (3 seconds)
1150        let results = match tokio::time::timeout(Duration::from_millis(3000), future::join_all(tasks)).await {
1151            Ok(results) => results,
1152            Err(_) => {
1153                warn!(
1154                    "Concurrent heartbeat timeout after 3000ms for client [{}]",
1155                    self.client_id
1156                );
1157                return false;
1158            }
1159        };
1160
1161        // Update broker version table after all tasks complete
1162        // This avoids lock contention during task execution
1163        let mut success_count = 0;
1164
1165        for result in results {
1166            match result {
1167                Ok((broker_name, addr, version_opt, success)) => {
1168                    if success {
1169                        success_count += 1;
1170                        if let Some(version) = version_opt {
1171                            self.broker_version_table
1172                                .entry(broker_name.clone())
1173                                .or_default()
1174                                .insert(addr, version);
1175                        }
1176                    }
1177                }
1178                Err(e) => {
1179                    warn!("Concurrent heartbeat task panicked: {:?}", e);
1180                }
1181            }
1182        }
1183
1184        info!(
1185            "Concurrent heartbeat completed for client [{}]: {}/{} succeeded",
1186            self.client_id, success_count, task_count
1187        );
1188
1189        // Return true if at least one heartbeat succeeded
1190        success_count > 0
1191    }
1192
1193    async fn send_heartbeat_to_all_broker(&self) -> bool {
1194        let heartbeat_data = self.prepare_heartbeat_data(false).await;
1195        let producer_empty = heartbeat_data.producer_data_set.is_empty();
1196        let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
1197        if producer_empty && consumer_empty {
1198            warn!(
1199                "sending heartbeat, but no consumer and no producer. [{}]",
1200                self.client_id
1201            );
1202            return false;
1203        }
1204        if self.broker_addr_table.is_empty() {
1205            return false;
1206        }
1207        for broker_entry in self.broker_addr_table.iter() {
1208            let (broker_name, broker_addrs) = broker_entry.pair();
1209            if broker_addrs.is_empty() {
1210                continue;
1211            }
1212            for (id, addr) in broker_addrs.iter() {
1213                if addr.is_empty() {
1214                    continue;
1215                }
1216                if consumer_empty && *id != mix_all::MASTER_ID {
1217                    continue;
1218                }
1219                self.send_heartbeat_to_broker_inner(*id, broker_name, addr, &heartbeat_data)
1220                    .await;
1221            }
1222        }
1223
1224        true
1225    }
1226
1227    pub async fn send_heartbeat_to_broker(
1228        &self,
1229        id: u64,
1230        broker_name: &CheetahString,
1231        addr: &CheetahString,
1232        strict_lock_mode: bool,
1233    ) -> bool {
1234        let _guard = match self.lock_heartbeat.try_lock().await {
1235            Some(g) => g,
1236            None => {
1237                if strict_lock_mode {
1238                    warn!("lock heartBeat, but failed. [{}]", self.client_id);
1239                }
1240                return false;
1241            }
1242        };
1243
1244        let heartbeat_data = self.prepare_heartbeat_data(false).await;
1245        let producer_empty = heartbeat_data.producer_data_set.is_empty();
1246        let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
1247        if producer_empty && consumer_empty {
1248            warn!(
1249                "sending heartbeat, but no consumer and no producer. [{}]",
1250                self.client_id
1251            );
1252            return false;
1253        }
1254
1255        if self.client_config.use_heartbeat_v2 {
1256            self.send_heartbeat_to_broker_v2(id, broker_name, addr, heartbeat_data)
1257                .await
1258        } else {
1259            let (result, _) = self
1260                .send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
1261                .await;
1262            result
1263        }
1264    }
1265
1266    /// Send HeartbeatV2 to broker
1267    async fn send_heartbeat_to_broker_v2(
1268        &self,
1269        id: u64,
1270        broker_name: &CheetahString,
1271        addr: &CheetahString,
1272        mut heartbeat_data_with_sub: HeartbeatData,
1273    ) -> bool {
1274        // Calculate current fingerprint
1275        let current_fingerprint = heartbeat_data_with_sub.compute_heartbeat_fingerprint();
1276        heartbeat_data_with_sub.heartbeat_fingerprint = current_fingerprint;
1277
1278        // Check if this broker supports V2
1279        let broker_support_v2 = self.broker_support_v2_heartbeat_set.contains_key(addr);
1280
1281        // Get last fingerprint for this broker
1282        let last_fingerprint = self
1283            .broker_heartbeat_fingerprint_table
1284            .get(addr)
1285            .map(|entry| *entry.value());
1286
1287        // Determine if we should send minimal heartbeat (without subscription)
1288        let should_send_minimal =
1289            broker_support_v2 && last_fingerprint.is_some() && last_fingerprint.unwrap() == current_fingerprint;
1290
1291        let heartbeat_data = if should_send_minimal {
1292            // Send minimal heartbeat without subscription data
1293            let mut minimal = self.prepare_heartbeat_data(true).await;
1294            minimal.heartbeat_fingerprint = current_fingerprint;
1295            minimal
1296        } else {
1297            // Send full heartbeat with subscription data
1298            heartbeat_data_with_sub.clone()
1299        };
1300
1301        // Send heartbeat and get result with V2 support info
1302        let (result, support_v2) = self
1303            .send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
1304            .await;
1305
1306        if result {
1307            if let Some(support_v2) = support_v2 {
1308                if support_v2 {
1309                    // Update V2 support set
1310                    self.broker_support_v2_heartbeat_set.insert(addr.clone(), ());
1311
1312                    // Update fingerprint cache only when sending full data
1313                    if !should_send_minimal {
1314                        self.broker_heartbeat_fingerprint_table
1315                            .insert(addr.clone(), current_fingerprint);
1316                    }
1317                } else {
1318                    // Broker doesn't support V2, remove from set and clear fingerprint
1319                    self.broker_support_v2_heartbeat_set.remove(addr);
1320                    self.broker_heartbeat_fingerprint_table.remove(addr);
1321
1322                    warn!("Broker {} does not support HeartbeatV2, downgrading to V1", addr);
1323                }
1324            }
1325        }
1326
1327        result
1328    }
1329
1330    async fn send_heartbeat_to_broker_inner(
1331        &self,
1332        id: u64,
1333        broker_name: &CheetahString,
1334        addr: &CheetahString,
1335        heartbeat_data: &HeartbeatData,
1336    ) -> (bool, Option<bool>) {
1337        match self
1338            .mq_client_api_impl
1339            .as_ref()
1340            .unwrap()
1341            .mut_from_ref()
1342            .send_heartbeat(addr, heartbeat_data, self.client_config.mq_client_api_timeout)
1343            .await
1344        {
1345            Ok((version, response)) => {
1346                self.broker_version_table
1347                    .entry(broker_name.clone())
1348                    .or_default()
1349                    .insert(addr.clone(), version);
1350
1351                let times = self
1352                    .send_heartbeat_times_total
1353                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1354                if times % 20 == 0 {
1355                    info!("send heart beat to broker[{} {} {}] success", broker_name, id, addr);
1356                }
1357
1358                // Check if broker supports HeartbeatV2
1359                let support_v2 = response.and_then(|resp| {
1360                    resp.ext_fields().and_then(|fields| {
1361                        fields
1362                            .get(rocketmq_common::common::mix_all::IS_SUPPORT_HEART_BEAT_V2)
1363                            .and_then(|v| v.parse::<bool>().ok())
1364                    })
1365                });
1366
1367                (true, support_v2)
1368            }
1369            Err(_) => {
1370                if self.is_broker_in_name_server(addr).await {
1371                    warn!("send heart beat to broker[{} {} {}] failed", broker_name, id, addr);
1372                } else {
1373                    warn!(
1374                        "send heart beat to broker[{} {} {}] exception, because the broker not up, forget it",
1375                        broker_name, id, addr
1376                    );
1377                }
1378                (false, None)
1379            }
1380        }
1381    }
1382
1383    async fn is_broker_in_name_server(&self, broker_name: &str) -> bool {
1384        for entry in self.topic_route_table.iter() {
1385            for bd in entry.value().broker_datas.iter() {
1386                for (_, value) in bd.broker_addrs().iter() {
1387                    if value.as_str() == broker_name {
1388                        return true;
1389                    }
1390                }
1391            }
1392        }
1393        false
1394    }
1395
1396    async fn prepare_heartbeat_data(&self, is_without_sub: bool) -> HeartbeatData {
1397        let mut heartbeat_data = HeartbeatData {
1398            client_id: self.client_id.clone(),
1399            ..Default::default()
1400        };
1401
1402        for entry in self.consumer_table.iter() {
1403            let mut consumer_data = ConsumerData {
1404                group_name: entry.value().group_name(),
1405                consume_type: entry.value().consume_type(),
1406                message_model: entry.value().message_model(),
1407                consume_from_where: entry.value().consume_from_where(),
1408                subscription_data_set: entry.value().subscriptions(),
1409                unit_mode: entry.value().is_unit_mode(),
1410            };
1411            if !is_without_sub {
1412                entry.value().subscriptions().iter().for_each(|sub| {
1413                    consumer_data.subscription_data_set.insert(sub.clone());
1414                });
1415            }
1416            heartbeat_data.consumer_data_set.insert(consumer_data);
1417        }
1418        for entry in self.producer_table.iter() {
1419            let producer_data = ProducerData {
1420                group_name: entry.key().clone(),
1421            };
1422            heartbeat_data.producer_data_set.insert(producer_data);
1423        }
1424        heartbeat_data.is_without_sub = is_without_sub;
1425        heartbeat_data
1426    }
1427
1428    pub async fn register_consumer(&mut self, group: &CheetahString, consumer: MQConsumerInnerImpl) -> bool {
1429        if self.consumer_table.contains_key(group) {
1430            warn!("the consumer group[{}] exist already.", group);
1431            return false;
1432        }
1433        self.consumer_table.insert(group.clone(), consumer);
1434        true
1435    }
1436
1437    pub async fn check_client_in_broker(&mut self) -> rocketmq_error::RocketMQResult<()> {
1438        for entry in self.consumer_table.iter() {
1439            let subscription_inner = entry.value().subscriptions();
1440            if subscription_inner.is_empty() {
1441                return Ok(());
1442            }
1443            for subscription_data in subscription_inner.iter() {
1444                if ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) {
1445                    continue;
1446                }
1447                let addr = self.find_broker_addr_by_topic(subscription_data.topic.as_str()).await;
1448                if let Some(addr) = addr {
1449                    match self
1450                        .mq_client_api_impl
1451                        .as_mut()
1452                        .unwrap()
1453                        .check_client_in_broker(
1454                            addr.as_str(),
1455                            entry.key().as_str(),
1456                            self.client_id.as_str(),
1457                            subscription_data,
1458                            self.client_config.mq_client_api_timeout,
1459                        )
1460                        .await
1461                    {
1462                        Ok(_) => {}
1463                        Err(e) => match e {
1464                            rocketmq_error::RocketMQError::IllegalArgument(_) => {
1465                                return Err(e);
1466                            }
1467                            _ => {
1468                                let desc = format!(
1469                                    "Check client in broker error, maybe because you use {} to filter message, but \
1470                                     server has not been upgraded to support!This error would not affect the launch \
1471                                     of consumer, but may has impact on message receiving if you have use the new \
1472                                     features which are not supported by server, please check the log!",
1473                                    subscription_data.expression_type
1474                                );
1475                            }
1476                        },
1477                    }
1478                }
1479            }
1480        }
1481
1482        Ok(())
1483    }
1484
1485    pub async fn do_rebalance(&mut self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
1486        let mut balanced = true;
1487        for entry in self.consumer_table.iter() {
1488            match entry.value().try_rebalance().await {
1489                Ok(result) => {
1490                    if !result {
1491                        balanced = false;
1492                    }
1493                }
1494                Err(e) => {
1495                    error!(
1496                        "doRebalance for consumer group [{}] exception:{}",
1497                        entry.key(),
1498                        e.to_string()
1499                    );
1500                    balanced = false;
1501                }
1502            }
1503        }
1504        Ok(balanced)
1505    }
1506
1507    pub fn rebalance_later(&mut self, delay_millis: u64) {
1508        if delay_millis == 0 {
1509            self.rebalance_service.wakeup();
1510        } else {
1511            let service = self.rebalance_service.clone();
1512            tokio::spawn(async move {
1513                tokio::time::sleep(Duration::from_millis(delay_millis)).await;
1514                service.wakeup();
1515            });
1516        }
1517    }
1518
1519    pub async fn find_broker_address_in_subscribe(
1520        &mut self,
1521        broker_name: &CheetahString,
1522        broker_id: u64,
1523        only_this_broker: bool,
1524    ) -> Option<FindBrokerResult> {
1525        if broker_name.is_empty() {
1526            return None;
1527        }
1528
1529        let mut broker_addr: Option<CheetahString> = None;
1530        let mut slave = false;
1531        let mut found = false;
1532
1533        if let Some(map_ref) = self.broker_addr_table.get(broker_name) {
1534            let map = map_ref.value();
1535            if let Some(addr) = map.get(&broker_id) {
1536                broker_addr = Some(addr.clone());
1537                slave = broker_id != mix_all::MASTER_ID;
1538                found = true;
1539            } else if !found && broker_id != mix_all::MASTER_ID {
1540                if let Some(addr) = map.get(&(broker_id + 1)) {
1541                    broker_addr = Some(addr.clone());
1542                    found = true;
1543                }
1544            }
1545            if !found && !only_this_broker {
1546                if let Some((key, value)) = map.iter().next() {
1547                    broker_addr = Some(value.clone());
1548                    slave = *key != mix_all::MASTER_ID;
1549                    found = !value.is_empty();
1550                }
1551            }
1552        }
1553
1554        if found {
1555            let broker_addr = broker_addr?;
1556            let broker_version = self.find_broker_version(broker_name, broker_addr.as_str()).await;
1557            Some(FindBrokerResult {
1558                broker_addr,
1559                slave,
1560                broker_version,
1561            })
1562        } else {
1563            None
1564        }
1565    }
1566    async fn find_broker_version(&self, broker_name: &str, broker_addr: &str) -> i32 {
1567        if let Some(map) = self.broker_version_table.get(broker_name) {
1568            if let Some(version) = map.value().get(broker_addr) {
1569                return *version;
1570            }
1571        }
1572        0
1573    }
1574
1575    pub async fn select_consumer(&self, group: &str) -> Option<MQConsumerInnerImpl> {
1576        self.consumer_table.get(group).map(|entry| entry.value().clone())
1577    }
1578
1579    pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
1580        self.producer_table.get(group).map(|entry| entry.value().clone())
1581    }
1582
1583    pub async fn unregister_consumer(&mut self, group: impl Into<CheetahString>) -> bool {
1584        let group = group.into();
1585        if group.is_empty() {
1586            warn!("unregister_consumer: group name is empty");
1587            return false;
1588        }
1589
1590        let removed = self.consumer_table.remove(&group).is_some();
1591
1592        if removed {
1593            info!("unregister consumer [{}] OK", group);
1594            self.unregister_client(None, Some(group)).await;
1595            true
1596        } else {
1597            warn!("unregister consumer [{}] failed: not found in consumer table", group);
1598            false
1599        }
1600    }
1601
1602    pub async fn unregister_producer(&mut self, group: impl Into<CheetahString>) -> bool {
1603        let group = group.into();
1604        if group.is_empty() {
1605            warn!("unregister_producer: group name is empty");
1606            return false;
1607        }
1608
1609        let removed = self.producer_table.remove(&group).is_some();
1610
1611        if removed {
1612            info!("unregister producer [{}] OK", group);
1613            self.unregister_client(Some(group), None).await;
1614            true
1615        } else {
1616            warn!("unregister producer [{}] failed: not found in producer table", group);
1617            false
1618        }
1619    }
1620
1621    pub async fn unregister_admin_ext(&mut self, group: impl Into<CheetahString>) {
1622        let _ = self.admin_ext_table.remove(&group.into());
1623    }
1624    async fn unregister_client(
1625        &mut self,
1626        producer_group: Option<CheetahString>,
1627        consumer_group: Option<CheetahString>,
1628    ) {
1629        for broker_entry in self.broker_addr_table.iter() {
1630            let (broker_name, broker_addrs) = broker_entry.pair();
1631            for (id, addr) in broker_addrs.iter() {
1632                if let Err(err) = self
1633                    .mq_client_api_impl
1634                    .as_mut()
1635                    .unwrap()
1636                    .unregister_client(
1637                        addr,
1638                        self.client_id.clone(),
1639                        producer_group.clone(),
1640                        consumer_group.clone(),
1641                        self.client_config.mq_client_api_timeout,
1642                    )
1643                    .await
1644                {
1645                } else {
1646                    info!(
1647                        "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] success",
1648                        producer_group, consumer_group, broker_name, id, addr,
1649                    );
1650                }
1651            }
1652        }
1653    }
1654
1655    async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
1656        for entry in self.topic_route_table.iter() {
1657            for bd in entry.value().broker_datas.iter() {
1658                for (_, value) in bd.broker_addrs().iter() {
1659                    if value.as_str() == addr {
1660                        return true;
1661                    }
1662                }
1663            }
1664        }
1665        false
1666    }
1667
1668    /// Queries the assignment for a given topic.
1669    ///
1670    /// This function attempts to find the broker address for the specified topic. If the broker
1671    /// address is not found, it updates the topic route information from the name server and
1672    /// retries. If the broker address is found, it queries the assignment from the broker.
1673    ///
1674    /// # Arguments
1675    ///
1676    /// * `topic` - A reference to a `CheetahString` representing the topic to query.
1677    /// * `consumer_group` - A reference to a `CheetahString` representing the consumer group.
1678    /// * `strategy_name` - A reference to a `CheetahString` representing the allocation strategy
1679    ///   name.
1680    /// * `message_model` - The message model to use for the query.
1681    /// * `timeout` - The timeout duration for the query.
1682    ///
1683    /// # Returns
1684    ///
1685    /// A `Result` containing an `Option` with a `HashSet` of `MessageQueueAssignment` if the query
1686    /// is successful, or an error if it fails.
1687    pub async fn query_assignment(
1688        &mut self,
1689        topic: &CheetahString,
1690        consumer_group: &CheetahString,
1691        strategy_name: &CheetahString,
1692        message_model: MessageModel,
1693        timeout: u64,
1694    ) -> rocketmq_error::RocketMQResult<Option<HashSet<MessageQueueAssignment>>> {
1695        // Try to find broker address
1696        let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
1697
1698        // If not found, update and retry
1699        if broker_addr.is_none() {
1700            self.update_topic_route_info_from_name_server_topic(topic).await;
1701            broker_addr = self.find_broker_addr_by_topic(topic).await;
1702        }
1703        if let Some(broker_addr) = broker_addr {
1704            let client_id = self.client_id.clone();
1705            match self.mq_client_api_impl.as_mut() {
1706                Some(api_impl) => {
1707                    api_impl
1708                        .query_assignment(
1709                            &broker_addr,
1710                            topic.clone(),
1711                            consumer_group.clone(),
1712                            client_id,
1713                            strategy_name.clone(),
1714                            message_model,
1715                            timeout,
1716                        )
1717                        .await
1718                }
1719                None => Err(mq_client_err!("mq_client_api_impl is None")),
1720            }
1721        } else {
1722            Ok(None)
1723        }
1724    }
1725
1726    pub async fn consume_message_directly(
1727        &self,
1728        message: MessageExt,
1729        consumer_group: &CheetahString,
1730        broker_name: Option<CheetahString>,
1731    ) -> Option<ConsumeMessageDirectlyResult> {
1732        let consumer_inner = self.consumer_table.get(consumer_group);
1733        if let Some(entry) = consumer_inner {
1734            return entry.value().consume_message_directly(message, broker_name).await;
1735        }
1736
1737        None
1738    }
1739}
1740
1741#[allow(clippy::unnecessary_unwrap)]
1742pub fn topic_route_data2topic_publish_info(topic: &str, route: &mut TopicRouteData) -> TopicPublishInfo {
1743    let mut info = TopicPublishInfo {
1744        topic_route_data: Some(route.clone()),
1745        ..Default::default()
1746    };
1747    if route.order_topic_conf.is_some() && !route.order_topic_conf.as_ref().unwrap().is_empty() {
1748        let brokers = route
1749            .order_topic_conf
1750            .as_ref()
1751            .unwrap()
1752            .split(";")
1753            .map(|s| s.to_string())
1754            .collect::<Vec<String>>();
1755        for broker in brokers {
1756            let item = broker.split(":").collect::<Vec<&str>>();
1757            if item.len() == 2 {
1758                let queue_num = item[1].parse::<i32>().unwrap();
1759                for i in 0..queue_num {
1760                    let mq = MessageQueue::from_parts(topic, item[0], i);
1761                    info.message_queue_list.push(mq);
1762                }
1763            }
1764        }
1765        info.order_topic = true;
1766    } else if route.order_topic_conf.is_none()
1767        && route.topic_queue_mapping_by_broker.is_some()
1768        && !route.topic_queue_mapping_by_broker.as_ref().unwrap().is_empty()
1769    {
1770        info.order_topic = false;
1771        let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1772        if let Some(mq_end_points) = mq_end_points {
1773            for (mq, broker_name) in mq_end_points {
1774                info.message_queue_list.push(mq);
1775            }
1776        }
1777        info.message_queue_list
1778            .sort_by(|a, b| match a.queue_id().cmp(&b.queue_id()) {
1779                Ordering::Less => std::cmp::Ordering::Less,
1780                Ordering::Equal => std::cmp::Ordering::Equal,
1781                Ordering::Greater => std::cmp::Ordering::Greater,
1782            });
1783    } else {
1784        route.queue_datas.sort();
1785        for queue_data in route.queue_datas.iter() {
1786            if PermName::is_writeable(queue_data.perm) {
1787                let mut broker_data = None;
1788                for bd in route.broker_datas.iter() {
1789                    if bd.broker_name() == queue_data.broker_name.as_str() {
1790                        broker_data = Some(bd.clone());
1791                        break;
1792                    }
1793                }
1794                if broker_data.is_none() {
1795                    continue;
1796                }
1797                if !broker_data
1798                    .as_ref()
1799                    .unwrap()
1800                    .broker_addrs()
1801                    .contains_key(&(mix_all::MASTER_ID))
1802                {
1803                    continue;
1804                }
1805                for i in 0..queue_data.write_queue_nums {
1806                    let mq = MessageQueue::from_parts(topic, queue_data.broker_name.as_str(), i as i32);
1807                    info.message_queue_list.push(mq);
1808                }
1809            }
1810        }
1811    }
1812    info
1813}
1814
1815pub fn topic_route_data2topic_subscribe_info(topic: &str, route: &TopicRouteData) -> HashSet<MessageQueue> {
1816    if let Some(ref topic_queue_mapping_by_broker) = route.topic_queue_mapping_by_broker {
1817        if !topic_queue_mapping_by_broker.is_empty() {
1818            let mq_endpoints = ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1819            return mq_endpoints
1820                .unwrap_or_default()
1821                .keys()
1822                .cloned()
1823                .collect::<HashSet<MessageQueue>>();
1824        }
1825    }
1826    let mut mq_list = HashSet::new();
1827    for qd in &route.queue_datas {
1828        if PermName::is_readable(qd.perm) {
1829            for i in 0..qd.read_queue_nums {
1830                let mq = MessageQueue::from_parts(topic, qd.broker_name.as_str(), i as i32);
1831                mq_list.insert(mq);
1832            }
1833        }
1834    }
1835    mq_list
1836}