1use std::cmp::Ordering;
16use std::collections::HashMap;
17use std::collections::HashSet;
18use std::sync::atomic::AtomicI64;
19use std::sync::Arc;
20use std::time::Duration;
21
22use cheetah_string::CheetahString;
23use dashmap::DashMap;
24use futures::future;
25use rand::seq::IndexedRandom;
26use rocketmq_common::common::base::service_state::ServiceState;
27use rocketmq_common::common::boundary_type::BoundaryType;
28use rocketmq_common::common::config::TopicConfig;
29use rocketmq_common::common::constant::PermName;
30use rocketmq_common::common::filter::expression_type::ExpressionType;
31use rocketmq_common::common::message::message_decoder;
32use rocketmq_common::common::message::message_ext::MessageExt;
33use rocketmq_common::common::message::message_queue::MessageQueue;
34use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
35use rocketmq_common::common::mix_all;
36use rocketmq_common::TimeUtils::current_millis;
37use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
38use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
39use rocketmq_remoting::protocol::header::get_topic_config_request_header::GetTopicConfigRequestHeader;
40use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
41use rocketmq_remoting::protocol::header::query_consumer_offset_request_header::QueryConsumerOffsetRequestHeader;
42use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader;
43use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
44use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
45use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
46use rocketmq_remoting::protocol::heartbeat::producer_data::ProducerData;
47use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
48use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
49use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
50use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
51use rocketmq_remoting::runtime::RPCHook;
52use rocketmq_rust::schedule::simple_scheduler::ScheduledTaskManager;
53use rocketmq_rust::ArcMut;
54use rocketmq_rust::RocketMQTokioMutex;
55use tracing::error;
56use tracing::info;
57use tracing::warn;
58
59use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
60use crate::base::client_config::ClientConfig;
61use crate::consumer::consumer_impl::pull_message_service::PullMessageService;
62use crate::consumer::consumer_impl::pull_request_ext::PullResultExt;
63use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceService;
64use crate::consumer::mq_consumer_inner::MQConsumerInner;
65use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
66use crate::consumer::pull_callback::PullCallback;
67use crate::consumer::pull_result::PullResult;
68use crate::consumer::pull_status::PullStatus;
69use crate::factory::client_tables::*;
70use crate::implementation::client_remoting_processor::ClientRemotingProcessor;
71use crate::implementation::communication_mode::CommunicationMode;
72use crate::implementation::find_broker_result::FindBrokerResult;
73use crate::implementation::mq_admin_impl::MQAdminImpl;
74use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
75use crate::producer::default_mq_producer::DefaultMQProducer;
76use crate::producer::default_mq_producer::ProducerConfig;
77use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
78use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
79use crate::stat::consumer_stats_manager::ConsumerStatsManager;
80
81const LOCK_TIMEOUT_MILLIS: u64 = 3000;
82
83pub struct MQClientInstance {
84 pub(crate) client_config: ArcMut<ClientConfig>,
85 pub(crate) client_id: CheetahString,
86 boot_timestamp: u64,
87 producer_table: ProducerTable,
92 consumer_table: ConsumerTable,
97 admin_ext_table: AdminExtTable,
102 pub(crate) mq_client_api_impl: Option<ArcMut<MQClientAPIImpl>>,
103 pub(crate) mq_admin_impl: ArcMut<MQAdminImpl>,
104 pub(crate) topic_route_table: TopicRouteTable,
105 topic_end_points_table: TopicEndPointsTable,
106 lock_namesrv: Arc<RocketMQTokioMutex<()>>,
107 lock_heartbeat: Arc<RocketMQTokioMutex<()>>,
108
109 service_state: ServiceState,
110 pub(crate) pull_message_service: ArcMut<PullMessageService>,
111 rebalance_service: RebalanceService,
112 pub(crate) default_producer: ArcMut<DefaultMQProducer>,
113 broker_addr_table: BrokerAddrTable,
114 broker_version_table: BrokerVersionTable,
115 send_heartbeat_times_total: Arc<AtomicI64>,
116 scheduled_task_manager: ScheduledTaskManager,
117 broker_heartbeat_fingerprint_table: BrokerHeartbeatFingerprintTable,
119 broker_support_v2_heartbeat_set: BrokerSupportV2HeartbeatSet,
121 consumer_stats_manager: ConsumerStatsManager,
122}
123
124impl MQClientInstance {
125 pub fn new_arc(
126 client_config: ClientConfig,
127 _instance_index: i32,
128 client_id: impl Into<CheetahString>,
129 rpc_hook: Option<Arc<dyn RPCHook>>,
130 ) -> ArcMut<MQClientInstance> {
131 let client_id = client_id.into();
132 let shared_config = ArcMut::new(client_config.clone());
133
134 let broker_addr_table = Arc::new(DashMap::default());
135 let producer_table = Arc::new(DashMap::default());
136 let consumer_table = Arc::new(DashMap::default());
137 let admin_ext_table = Arc::new(DashMap::default());
138 let topic_route_table = Arc::new(DashMap::default());
139 let topic_end_points_table = Arc::new(DashMap::default());
140 let broker_version_table = Arc::new(DashMap::default());
141 let broker_heartbeat_fingerprint_table = Arc::new(DashMap::default());
142 let broker_support_v2_heartbeat_set = Arc::new(DashMap::default());
143
144 let default_producer = ArcMut::new(
145 DefaultMQProducer::builder()
146 .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
147 .client_config(client_config.clone())
148 .build(),
149 );
150 let mut instance = ArcMut::new(MQClientInstance {
151 client_config: shared_config,
152 client_id,
153 boot_timestamp: current_millis(),
154 producer_table,
155 consumer_table,
156 admin_ext_table,
157 mq_client_api_impl: None,
158 mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
159 topic_route_table,
160 topic_end_points_table,
161 lock_namesrv: Arc::default(),
162 lock_heartbeat: Arc::default(),
163 service_state: ServiceState::CreateJust,
164 pull_message_service: ArcMut::new(PullMessageService::new()),
165 rebalance_service: RebalanceService::new(),
166 default_producer,
167 broker_addr_table,
168 broker_version_table,
169 send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
170 scheduled_task_manager: ScheduledTaskManager::new(),
171 broker_heartbeat_fingerprint_table,
172 broker_support_v2_heartbeat_set,
173 consumer_stats_manager: ConsumerStatsManager::new(),
174 });
175
176 let instance_clone = instance.clone();
178 instance.mq_admin_impl.set_client(instance_clone);
179
180 let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
181
182 let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
183 Arc::new(TokioClientConfig::default()),
184 ClientRemotingProcessor::new(instance.clone()),
185 rpc_hook,
186 Arc::new(client_config.clone()),
187 Some(tx),
188 ));
189
190 if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
191 let api_impl = mq_client_api_impl.clone();
192 let addr = namesrv_addr.to_string();
193 tokio::task::block_in_place(|| {
195 tokio::runtime::Handle::current().block_on(async move {
196 api_impl.update_name_server_address_list(&addr).await;
197 })
198 });
199 }
200
201 instance.mq_client_api_impl = Some(mq_client_api_impl);
202
203 let weak_instance = ArcMut::downgrade(&instance);
205 tokio::spawn(async move {
206 while let Ok(value) = rx.recv().await {
207 if let Some(instance_) = weak_instance.upgrade() {
208 match value {
209 ConnectionNetEvent::CONNECTED(remote_address) => {
210 info!("ConnectionNetEvent CONNECTED");
211
212 let matched_brokers: Vec<_> = instance_
213 .broker_addr_table
214 .iter()
215 .flat_map(|entry| {
216 let (name, addrs) = entry.pair();
217 addrs
218 .iter()
219 .filter(|(_, addr)| addr.as_str() == remote_address.to_string().as_str())
220 .map(|(id, addr)| (*id, name.clone(), addr.clone()))
221 .collect::<Vec<_>>()
222 })
223 .collect();
224
225 for (id, broker_name, addr) in matched_brokers {
226 if instance_.send_heartbeat_to_broker(id, &broker_name, &addr, false).await {
227 instance_.re_balance_immediately();
228 }
229 }
230 }
231 ConnectionNetEvent::DISCONNECTED => {}
232 ConnectionNetEvent::EXCEPTION => {}
233 ConnectionNetEvent::IDLE => {}
234 }
235 }
236 }
237 warn!("ConnectionNetEvent recv error");
238 });
239 instance
240 }
241
242 pub fn consumer_stats_manager(&self) -> &ConsumerStatsManager {
244 &self.consumer_stats_manager
245 }
246
247 pub fn re_balance_immediately(&self) {
248 self.rebalance_service.wakeup();
249 }
250
251 pub fn re_balance_later(&self, delay_millis: Duration) {
252 if delay_millis <= Duration::from_millis(0) {
253 self.rebalance_service.wakeup();
254 } else {
255 let service = self.rebalance_service.clone();
256 tokio::spawn(async move {
257 tokio::time::sleep(delay_millis).await;
258 service.wakeup();
259 });
260 }
261 }
262
263 pub async fn start(&mut self, this: ArcMut<Self>) -> rocketmq_error::RocketMQResult<()> {
264 match self.service_state {
265 ServiceState::CreateJust => {
266 self.service_state = ServiceState::StartFailed;
267 if self.client_config.namesrv_addr.is_none() {
269 self.mq_client_api_impl
270 .as_mut()
271 .expect("mq_client_api_impl is None")
272 .fetch_name_server_addr()
273 .await;
274 }
275 self.mq_client_api_impl
277 .as_mut()
278 .expect("mq_client_api_impl is None")
279 .start()
280 .await;
281 self.start_scheduled_task(this.clone());
283 let instance = this.clone();
285 if let Err(e) = self.pull_message_service.start(instance).await {
286 error!("Failed to start pull message service: {:?}", e);
287 }
288 if let Err(e) = self.rebalance_service.start(this).await {
290 error!("Failed to start rebalance service: {:?}", e);
291 }
292 self.default_producer
295 .default_mqproducer_impl
296 .as_mut()
297 .unwrap()
298 .start_with_factory(false)
299 .await?;
300 self.consumer_stats_manager.start();
302 info!("the client factory[{}] start OK", self.client_id);
303 self.service_state = ServiceState::Running;
304 }
305 ServiceState::Running => {}
306 ServiceState::ShutdownAlready => {}
307 ServiceState::StartFailed => {
308 return Err(mq_client_err!(format!(
309 "The Factory object[{}] has been created before, and failed.",
310 self.client_id
311 )));
312 }
313 }
314 Ok(())
315 }
316
317 pub async fn shutdown(&mut self) {
318 match self.service_state {
319 ServiceState::CreateJust | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
320 warn!(
321 "MQClientInstance shutdown called but state is {:?}, ignoring shutdown request",
322 self.service_state
323 );
324 return;
325 }
326 ServiceState::Running => {
327 info!(
328 "MQClientInstance[{}] shutdown starting, current state: Running",
329 self.client_id
330 );
331 }
332 }
333
334 info!("MQClientInstance[{}] shutting down rebalance service", self.client_id);
335 if let Err(e) = self.rebalance_service.shutdown(3000).await {
336 warn!("Failed to shutdown rebalance service: {:?}", e);
337 }
338
339 info!(
340 "MQClientInstance[{}] shutting down pull message service",
341 self.client_id
342 );
343 if let Err(e) = self.pull_message_service.shutdown_default().await {
344 warn!("Failed to shutdown pull message service: {:?}", e);
345 }
346
347 info!("MQClientInstance[{}] persisting all consumer offsets", self.client_id);
348 self.persist_all_consumer_offset().await;
349
350 info!("MQClientInstance[{}] shutting down default producer", self.client_id);
351 if let Some(producer_impl) = self.default_producer.default_mqproducer_impl.as_mut() {
352 let shutdown_future = producer_impl.shutdown_with_factory(false);
353 if let Err(e) = Box::pin(shutdown_future).await {
354 warn!("Failed to shutdown default producer: {:?}", e);
355 }
356 }
357
358 info!("MQClientInstance[{}] unregistering all consumers", self.client_id);
359 self.unregister_all_consumers().await;
360
361 info!("MQClientInstance[{}] unregistering all producers", self.client_id);
362 self.unregister_all_producers().await;
363
364 if self.mq_client_api_impl.is_some() {
365 info!(
366 "MQClientInstance[{}] network client shutdown (deferred to Drop)",
367 self.client_id
368 );
369 }
370
371 info!("MQClientInstance[{}] canceling scheduled tasks", self.client_id);
372 self.scheduled_task_manager.cancel_all();
373 info!(
374 "MQClientInstance[{}] scheduled tasks canceled, total canceled: {}",
375 self.client_id,
376 self.scheduled_task_manager.task_count()
377 );
378
379 info!("MQClientInstance[{}] clearing all registration tables", self.client_id);
380 self.producer_table.clear();
381 self.consumer_table.clear();
382 self.admin_ext_table.clear();
383
384 self.topic_route_table.clear();
385 self.topic_end_points_table.clear();
386 self.broker_addr_table.clear();
387 self.broker_version_table.clear();
388
389 self.service_state = ServiceState::ShutdownAlready;
390
391 info!("MQClientInstance[{}] shutdown completed successfully", self.client_id);
392 }
393
394 async fn unregister_all_producers(&mut self) {
396 let producer_groups: Vec<CheetahString> = self.producer_table.iter().map(|entry| entry.key().clone()).collect();
398
399 for group in producer_groups {
401 info!("Unregistering producer group: {}", group);
402 self.unregister_producer(group).await;
403 }
404 }
405
406 async fn unregister_all_consumers(&mut self) {
408 let consumer_groups: Vec<CheetahString> = self.consumer_table.iter().map(|entry| entry.key().clone()).collect();
410
411 for group in consumer_groups {
413 info!("Unregistering consumer group: {}", group);
414 self.unregister_consumer(group).await;
415 }
416 }
417
418 pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
419 if group.is_empty() {
420 return false;
421 }
422 if self.producer_table.contains_key(group) {
423 warn!("the producer group[{}] exist already.", group);
424 return false;
425 }
426 self.producer_table.insert(group.into(), producer);
427 true
428 }
429
430 pub async fn register_admin_ext(&mut self, group: &str, admin: MQAdminExtInnerImpl) -> bool {
431 if group.is_empty() {
432 return false;
433 }
434 if self.admin_ext_table.contains_key(group) {
435 warn!("the admin group[{}] exist already.", group);
436 return false;
437 }
438 self.admin_ext_table.insert(group.into(), admin);
439 true
440 }
441
442 fn start_scheduled_task(&mut self, this: ArcMut<Self>) {
443 info!("Starting scheduled tasks with ScheduledTaskManager");
444
445 if self.client_config.namesrv_addr.is_none() {
446 let mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
447 self.scheduled_task_manager.add_fixed_rate_task_async(
448 Duration::from_secs(10),
449 Duration::from_secs(120),
450 async move |_token| {
451 let mut api = mq_client_api_impl.clone();
452 info!("ScheduledTask: fetchNameServerAddr");
453 api.fetch_name_server_addr().await;
454 Ok(())
455 },
456 );
457 }
458
459 let client_instance = this.clone();
460 let poll_name_server_interval = self.client_config.poll_name_server_interval;
461 self.scheduled_task_manager.add_fixed_rate_task_async(
462 Duration::from_millis(10),
463 Duration::from_millis(poll_name_server_interval as u64),
464 async move |_token| {
465 let mut instance = client_instance.clone();
466 info!("ScheduledTask: update_topic_route_info_from_name_server");
467 instance.update_topic_route_info_from_name_server().await;
468 Ok(())
469 },
470 );
471
472 let client_instance = this.clone();
473 let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
474 self.scheduled_task_manager.add_fixed_rate_task_async(
475 Duration::from_secs(1),
476 Duration::from_millis(heartbeat_broker_interval as u64),
477 async move |_token| {
478 let mut instance = client_instance.clone();
479 info!("ScheduledTask: clean_offline_broker and send_heartbeat");
480 instance.clean_offline_broker().await;
481 instance.send_heartbeat_to_all_broker_with_lock().await;
482 Ok(())
483 },
484 );
485
486 let client_instance = this;
487 let persist_consumer_offset_interval = self.client_config.persist_consumer_offset_interval as u64;
488 self.scheduled_task_manager.add_fixed_rate_task_async(
489 Duration::from_secs(10),
490 Duration::from_millis(persist_consumer_offset_interval),
491 async move |_token| {
492 let mut instance = client_instance.clone();
493 info!("ScheduledTask: persistAllConsumerOffset");
494 instance.persist_all_consumer_offset().await;
495 Ok(())
496 },
497 );
498
499 info!(
500 "All scheduled tasks started, total tasks: {}",
501 self.scheduled_task_manager.task_count()
502 );
503 }
504
505 pub async fn update_topic_route_info_from_name_server(&mut self) {
506 let mut topic_list = HashSet::new();
507
508 for entry in self.producer_table.iter() {
509 topic_list.extend(entry.value().get_publish_topic_list());
510 }
511
512 for entry in self.consumer_table.iter() {
513 entry.value().subscriptions().iter().for_each(|sub| {
514 topic_list.insert(sub.topic.clone());
515 });
516 }
517
518 for topic in topic_list.iter() {
519 self.update_topic_route_info_from_name_server_topic(topic).await;
520 }
521 }
522
523 #[inline]
524 pub async fn update_topic_route_info_from_name_server_topic(&mut self, topic: &CheetahString) -> bool {
525 self.update_topic_route_info_from_name_server_default(topic, false, None)
526 .await
527 }
528
529 pub async fn find_consumer_id_list(
530 &mut self,
531 topic: &CheetahString,
532 group: &CheetahString,
533 ) -> Option<Vec<CheetahString>> {
534 let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
535 if broker_addr.is_none() {
536 self.update_topic_route_info_from_name_server_topic(topic).await;
537 broker_addr = self.find_broker_addr_by_topic(topic).await;
538 }
539 if let Some(broker_addr) = broker_addr {
540 match self
541 .mq_client_api_impl
542 .as_mut()
543 .unwrap()
544 .get_consumer_id_list_by_group(broker_addr.as_str(), group, self.client_config.mq_client_api_timeout)
545 .await
546 {
547 Ok(value) => return Some(value),
548 Err(e) => {
549 warn!(
550 "getConsumerIdListByGroup exception,{} {}, err:{}",
551 broker_addr,
552 group,
553 e.to_string()
554 );
555 }
556 }
557 }
558 None
559 }
560
561 pub async fn find_broker_addr_by_topic(&self, topic: &str) -> Option<CheetahString> {
562 if let Some(topic_route_data) = self.topic_route_table.get(topic) {
563 let brokers = &topic_route_data.value().broker_datas;
564 if !brokers.is_empty() {
565 let bd = brokers.choose(&mut rand::rng());
566 if let Some(bd) = bd {
567 return bd.select_broker_addr();
568 }
569 }
570 }
571 None
572 }
573
574 pub async fn update_topic_route_info_from_name_server_default(
575 &mut self,
576 topic: &CheetahString,
577 is_default: bool,
578 producer_config: Option<&Arc<ProducerConfig>>,
579 ) -> bool {
580 let lock = self.lock_namesrv.lock().await;
581 let topic_route_data = if let (true, Some(producer_config)) = (is_default, producer_config) {
582 let mut result = match self
583 .mq_client_api_impl
584 .as_mut()
585 .unwrap()
586 .get_default_topic_route_info_from_name_server(self.client_config.mq_client_api_timeout)
587 .await
588 {
589 Ok(value) => value,
590 Err(e) => {
591 error!(
592 "get_default_topic_route_info_from_name_server failed, topic: {}, error: {}",
593 topic, e
594 );
595 None
596 }
597 };
598 if let Some(topic_route_data) = result.as_mut() {
599 for data in topic_route_data.queue_datas.iter_mut() {
600 let queue_nums = producer_config.default_topic_queue_nums().max(data.read_queue_nums);
601 data.read_queue_nums = queue_nums;
602 data.write_queue_nums = queue_nums;
603 }
604 }
605 result
606 } else {
607 self.mq_client_api_impl
608 .as_mut()
609 .unwrap()
610 .get_topic_route_info_from_name_server(topic, self.client_config.mq_client_api_timeout)
611 .await
612 .unwrap_or(None)
613 };
614 if let Some(mut topic_route_data) = topic_route_data {
615 let old = self.topic_route_table.get(topic).map(|entry| entry.value().clone());
616 let mut changed = topic_route_data.topic_route_data_changed(old.as_ref());
617 if !changed {
618 changed = self.is_need_update_topic_route_info(topic).await;
619 } else {
620 info!(
621 "the topic[{}] route info changed, old[{:?}] ,new[{:?}]",
622 topic, old, topic_route_data
623 )
624 }
625 if changed {
626 for bd in topic_route_data.broker_datas.iter() {
627 self.broker_addr_table
628 .insert(bd.broker_name().clone(), bd.broker_addrs().clone());
629 }
630
631 {
633 let mq_end_points =
634 ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, &topic_route_data);
635 if let Some(mq_end_points) = mq_end_points {
636 if !mq_end_points.is_empty() {
637 self.topic_end_points_table.insert(topic.into(), mq_end_points);
638 }
639 }
640 }
641
642 {
644 let mut publish_info = topic_route_data2topic_publish_info(topic, &mut topic_route_data);
645 publish_info.have_topic_router_info = true;
646 for mut entry in self.producer_table.iter_mut() {
647 entry
648 .value_mut()
649 .update_topic_publish_info(topic.to_string(), Some(publish_info.clone()));
650 }
651 }
652
653 if !self.consumer_table.is_empty() {
655 let subscribe_info = topic_route_data2topic_subscribe_info(topic, &topic_route_data);
656
657 let consumers: Vec<_> = self.consumer_table.iter().map(|entry| entry.value().clone()).collect();
658
659 for consumer in consumers {
660 consumer
661 .update_topic_subscribe_info(topic.clone(), &subscribe_info)
662 .await;
663 }
664 }
665 let clone_topic_route_data = TopicRouteData::from_existing(&topic_route_data);
666 self.topic_route_table.insert(topic.clone(), clone_topic_route_data);
667 return true;
668 }
669 } else {
670 warn!(
671 "updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]",
672 topic, self.client_id
673 );
674 }
675
676 drop(lock);
677 false
678 }
679
680 async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool {
681 for entry in self.producer_table.iter() {
682 if entry.value().is_publish_topic_need_update(topic) {
683 return true;
684 }
685 }
686
687 for entry in self.consumer_table.iter() {
688 if entry.value().is_subscribe_topic_need_update(topic).await {
689 return true;
690 }
691 }
692 false
693 }
694
695 pub async fn persist_all_consumer_offset(&mut self) {
696 for entry in self.consumer_table.iter() {
697 entry.value().persist_consumer_offset().await;
698 }
699 }
700
701 pub async fn clean_offline_broker(&mut self) {
702 let lock = self
703 .lock_namesrv
704 .try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
705 .await;
706 if let Some(lock) = lock {
707 let mut updated_table = HashMap::new();
708 let mut broker_name_set = HashSet::new();
709
710 for broker_entry in self.broker_addr_table.iter() {
711 let (broker_name, one_table) = broker_entry.pair();
712 let mut clone_addr_table = one_table.clone();
713 let mut remove_id_set = HashSet::new();
714 for (id, addr) in one_table.iter() {
715 if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
716 remove_id_set.insert(*id);
717 }
718 }
719 clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
720 if clone_addr_table.is_empty() {
721 info!("the broker[{}] name's host is offline, remove it", broker_name);
722 broker_name_set.insert(broker_name.clone());
723 } else {
724 updated_table.insert(broker_name.clone(), clone_addr_table);
725 }
726 }
727
728 for broker_name in broker_name_set {
730 self.broker_addr_table.remove(&broker_name);
731 }
732 for (broker_name, addrs) in updated_table {
733 self.broker_addr_table.insert(broker_name, addrs);
734 }
735 }
736 }
737 pub async fn send_heartbeat_to_all_broker_with_lock(&self) -> bool {
738 let _guard = match self.lock_heartbeat.try_lock().await {
739 Some(g) => g,
740 None => {
741 warn!("lock heartBeat, but failed. [{}]", self.client_id);
742 return false;
743 }
744 };
745
746 if self.client_config.enable_concurrent_heartbeat {
748 return self.send_heartbeat_to_all_broker_concurrently().await;
749 }
750
751 if self.client_config.use_heartbeat_v2 {
753 self.send_heartbeat_to_all_broker_v2(false).await
754 } else {
755 self.send_heartbeat_to_all_broker().await
756 }
757 }
758
759 pub async fn send_heartbeat_to_all_broker_with_lock_v2(&self, is_rebalance: bool) -> bool {
760 let _guard = match self.lock_heartbeat.try_lock_timeout(Duration::from_secs(2)).await {
761 Some(g) => g,
762 None => {
763 warn!("lock heartBeat, but failed. [{}]", self.client_id);
764 return false;
765 }
766 };
767
768 if self.client_config.use_heartbeat_v2 {
769 self.send_heartbeat_to_all_broker_v2(is_rebalance).await
770 } else {
771 self.send_heartbeat_to_all_broker().await
772 }
773 }
774
775 pub fn get_mq_client_api_impl(&self) -> ArcMut<MQClientAPIImpl> {
776 self.mq_client_api_impl.as_ref().unwrap().clone()
777 }
778
779 pub async fn pull_message_from_broker(
780 &self,
781 broker_addr: &str,
782 request_header: PullMessageRequestHeader,
783 timeout_millis: u64,
784 ) -> rocketmq_error::RocketMQResult<PullResult> {
785 struct NoopPullCallback;
786
787 impl PullCallback for NoopPullCallback {
788 async fn on_success(&mut self, _pull_result: PullResultExt) {}
789
790 fn on_exception(&mut self, _e: Box<dyn std::error::Error + Send>) {}
791 }
792
793 let api_impl = self
794 .mq_client_api_impl
795 .as_ref()
796 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
797
798 let mut result = MQClientAPIImpl::pull_message(
799 api_impl.clone(),
800 CheetahString::from(broker_addr),
801 request_header,
802 timeout_millis,
803 CommunicationMode::Sync,
804 NoopPullCallback,
805 )
806 .await?
807 .ok_or_else(|| rocketmq_error::RocketMQError::Internal("pull_message returned None in sync mode".into()))?;
808
809 if result.pull_result.pull_status == PullStatus::Found {
810 if let Some(mut message_binary) = result.message_binary.take() {
811 let messages = message_decoder::decodes_batch(&mut message_binary, true, true);
812 result
813 .pull_result
814 .set_msg_found_list(Some(messages.into_iter().map(ArcMut::new).collect()));
815 }
816 }
817
818 Ok(result.pull_result)
819 }
820
821 pub async fn query_consumer_offset(
822 &self,
823 broker_addr: &str,
824 request_header: QueryConsumerOffsetRequestHeader,
825 timeout_millis: u64,
826 ) -> rocketmq_error::RocketMQResult<i64> {
827 let api_impl = self
828 .mq_client_api_impl
829 .as_ref()
830 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
831 api_impl
832 .clone()
833 .query_consumer_offset(broker_addr, request_header, timeout_millis)
834 .await
835 }
836
837 pub async fn update_consumer_offset(
838 &self,
839 broker_addr: &CheetahString,
840 request_header: UpdateConsumerOffsetRequestHeader,
841 timeout_millis: u64,
842 ) -> rocketmq_error::RocketMQResult<()> {
843 let api_impl = self
844 .mq_client_api_impl
845 .as_ref()
846 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
847 api_impl
848 .clone()
849 .update_consumer_offset(broker_addr, request_header, timeout_millis)
850 .await
851 }
852
853 pub async fn consumer_send_message_back(
854 &mut self,
855 broker_addr: &str,
856 broker_name: &str,
857 message: &MessageExt,
858 consumer_group: &str,
859 delay_level: i32,
860 timeout_millis: u64,
861 max_consume_retry_times: i32,
862 ) -> rocketmq_error::RocketMQResult<()> {
863 let api_impl = self
864 .mq_client_api_impl
865 .as_ref()
866 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
867 api_impl
868 .clone()
869 .consumer_send_message_back(
870 broker_addr,
871 broker_name,
872 message,
873 consumer_group,
874 delay_level,
875 timeout_millis,
876 max_consume_retry_times,
877 )
878 .await
879 }
880
881 pub async fn get_max_offset(
882 &self,
883 broker_addr: &str,
884 message_queue: &MessageQueue,
885 timeout_millis: u64,
886 ) -> rocketmq_error::RocketMQResult<i64> {
887 let api_impl = self
888 .mq_client_api_impl
889 .as_ref()
890 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
891 api_impl
892 .clone()
893 .get_max_offset(broker_addr, message_queue, timeout_millis)
894 .await
895 }
896
897 pub async fn get_min_offset(
898 &self,
899 broker_addr: &str,
900 message_queue: &MessageQueue,
901 timeout_millis: u64,
902 ) -> rocketmq_error::RocketMQResult<i64> {
903 let api_impl = self
904 .mq_client_api_impl
905 .as_ref()
906 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
907 api_impl
908 .clone()
909 .get_min_offset(broker_addr, message_queue, timeout_millis)
910 .await
911 }
912
913 pub async fn search_offset_by_timestamp(
914 &self,
915 broker_addr: &str,
916 message_queue: &MessageQueue,
917 timestamp: i64,
918 boundary_type: BoundaryType,
919 timeout_millis: u64,
920 ) -> rocketmq_error::RocketMQResult<i64> {
921 let api_impl = self
922 .mq_client_api_impl
923 .as_ref()
924 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
925 api_impl
926 .clone()
927 .search_offset_by_timestamp(broker_addr, message_queue, timestamp, boundary_type, timeout_millis)
928 .await
929 }
930
931 pub async fn get_topic_config(
932 &self,
933 broker_addr: &CheetahString,
934 topic: CheetahString,
935 timeout_millis: u64,
936 ) -> rocketmq_error::RocketMQResult<TopicConfig> {
937 let request_header = GetTopicConfigRequestHeader {
938 topic,
939 topic_request_header: None,
940 };
941 let api_impl = self
942 .mq_client_api_impl
943 .as_ref()
944 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
945 let topic_mapping = api_impl
946 .get_topic_config(broker_addr, request_header, timeout_millis)
947 .await?;
948 Ok(topic_mapping.topic_config)
949 }
950
951 pub async fn get_subscription_group_config(
952 &self,
953 broker_addr: &CheetahString,
954 group: CheetahString,
955 timeout_millis: u64,
956 ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
957 let api_impl = self
958 .mq_client_api_impl
959 .as_ref()
960 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
961 api_impl
962 .get_subscription_group_config(broker_addr, group, timeout_millis)
963 .await
964 }
965
966 pub async fn get_broker_name_from_message_queue(&self, message_queue: &MessageQueue) -> CheetahString {
967 if let Some(broker_name) = self.topic_end_points_table.get(message_queue.topic_str()) {
968 if let Some(addr) = broker_name.value().get(message_queue) {
969 return addr.clone();
970 }
971 }
972 message_queue.broker_name().clone()
973 }
974
975 pub fn find_broker_address_in_publish(&self, broker_name: &CheetahString) -> Option<CheetahString> {
976 if broker_name.is_empty() {
977 return None;
978 }
979 let map = self.broker_addr_table.get(broker_name);
980 if let Some(map) = map {
981 return map.get(&(mix_all::MASTER_ID)).cloned();
982 }
983 None
984 }
985
986 async fn send_heartbeat_to_all_broker_v2(&self, is_rebalance: bool) -> bool {
987 let heartbeat_data_with_sub = self.prepare_heartbeat_data(false).await;
988 let producer_empty = heartbeat_data_with_sub.producer_data_set.is_empty();
989 let consumer_empty = heartbeat_data_with_sub.consumer_data_set.is_empty();
990
991 if producer_empty && consumer_empty {
992 warn!(
993 "sendHeartbeatToAllBrokerV2 sending heartbeat, but no consumer and no producer. [{}]",
994 self.client_id
995 );
996 return false;
997 }
998
999 if self.broker_addr_table.is_empty() {
1000 return false;
1001 }
1002
1003 if is_rebalance {
1005 self.reset_broker_addr_heartbeat_fingerprint_map().await;
1006 }
1007
1008 let current_fingerprint = heartbeat_data_with_sub.compute_heartbeat_fingerprint();
1010
1011 for broker_entry in self.broker_addr_table.iter() {
1013 let (broker_name, broker_addrs) = broker_entry.pair();
1014 if broker_addrs.is_empty() {
1015 continue;
1016 }
1017
1018 for (id, addr) in broker_addrs.iter() {
1019 if addr.is_empty() {
1020 continue;
1021 }
1022 if consumer_empty && *id != mix_all::MASTER_ID {
1024 continue;
1025 }
1026
1027 let mut heartbeat_data = heartbeat_data_with_sub.clone();
1029 heartbeat_data.heartbeat_fingerprint = current_fingerprint;
1030
1031 self.send_heartbeat_to_broker_v2(*id, broker_name, addr, heartbeat_data)
1032 .await;
1033 }
1034 }
1035
1036 true
1037 }
1038
1039 async fn reset_broker_addr_heartbeat_fingerprint_map(&self) {
1040 self.broker_heartbeat_fingerprint_table.clear();
1041 }
1042
1043 async fn send_heartbeat_to_all_broker_concurrently(&self) -> bool {
1045 let heartbeat_data = self.prepare_heartbeat_data(false).await;
1046 let producer_empty = heartbeat_data.producer_data_set.is_empty();
1047 let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
1048
1049 if producer_empty && consumer_empty {
1050 warn!(
1051 "sending heartbeat concurrently, but no consumer and no producer. [{}]",
1052 self.client_id
1053 );
1054 return false;
1055 }
1056
1057 let broker_list: Vec<(u64, CheetahString, CheetahString)> = {
1059 if self.broker_addr_table.is_empty() {
1060 return false;
1061 }
1062
1063 let mut list = Vec::new();
1064 for broker_entry in self.broker_addr_table.iter() {
1065 let (broker_name, broker_addrs) = broker_entry.pair();
1066 if broker_addrs.is_empty() {
1067 continue;
1068 }
1069
1070 for (id, addr) in broker_addrs.iter() {
1071 if addr.is_empty() {
1072 continue;
1073 }
1074 if consumer_empty && *id != mix_all::MASTER_ID {
1076 continue;
1077 }
1078 list.push((*id, broker_name.clone(), addr.clone()));
1079 }
1080 }
1081 list
1082 };
1083
1084 if broker_list.is_empty() {
1085 return false;
1086 }
1087
1088 let task_count = broker_list.len();
1089
1090 let mut tasks = Vec::new();
1092 for (broker_id, broker_name, addr) in broker_list {
1093 let heartbeat_data = heartbeat_data.clone();
1095 let client_id = self.client_id.clone();
1096 let mq_client_api = self.mq_client_api_impl.as_ref().unwrap().clone();
1097 let timeout = self.client_config.mq_client_api_timeout;
1098 let send_heartbeat_times_total = self.send_heartbeat_times_total.clone();
1099 let topic_route_table_clone = self.topic_route_table.clone();
1100
1101 let task = tokio::spawn(async move {
1104 match mq_client_api
1105 .mut_from_ref()
1106 .send_heartbeat(&addr, &heartbeat_data, timeout)
1107 .await
1108 {
1109 Ok((version, _response)) => {
1110 let times = send_heartbeat_times_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1111 if times % 20 == 0 {
1112 info!(
1113 "send heart beat to broker[{} {} {}] success (concurrent)",
1114 broker_name, broker_id, addr
1115 );
1116 }
1117 (broker_name, addr, Some(version), true)
1118 }
1119 Err(_) => {
1120 let is_in_ns = topic_route_table_clone.iter().any(|route_entry| {
1122 route_entry.value().broker_datas.iter().any(|bd| {
1123 bd.broker_addrs()
1124 .iter()
1125 .any(|(_, broker_addr)| broker_addr.as_str() == addr.as_str())
1126 })
1127 });
1128
1129 if is_in_ns {
1130 warn!(
1131 "send heart beat to broker[{} {} {}] failed (concurrent)",
1132 broker_name, broker_id, addr
1133 );
1134 } else {
1135 warn!(
1136 "send heart beat to broker[{} {} {}] exception, because the broker not up, forget it \
1137 (concurrent)",
1138 broker_name, broker_id, addr
1139 );
1140 }
1141 (broker_name, addr, None, false)
1142 }
1143 }
1144 });
1145
1146 tasks.push(task);
1147 }
1148
1149 let results = match tokio::time::timeout(Duration::from_millis(3000), future::join_all(tasks)).await {
1151 Ok(results) => results,
1152 Err(_) => {
1153 warn!(
1154 "Concurrent heartbeat timeout after 3000ms for client [{}]",
1155 self.client_id
1156 );
1157 return false;
1158 }
1159 };
1160
1161 let mut success_count = 0;
1164
1165 for result in results {
1166 match result {
1167 Ok((broker_name, addr, version_opt, success)) => {
1168 if success {
1169 success_count += 1;
1170 if let Some(version) = version_opt {
1171 self.broker_version_table
1172 .entry(broker_name.clone())
1173 .or_default()
1174 .insert(addr, version);
1175 }
1176 }
1177 }
1178 Err(e) => {
1179 warn!("Concurrent heartbeat task panicked: {:?}", e);
1180 }
1181 }
1182 }
1183
1184 info!(
1185 "Concurrent heartbeat completed for client [{}]: {}/{} succeeded",
1186 self.client_id, success_count, task_count
1187 );
1188
1189 success_count > 0
1191 }
1192
1193 async fn send_heartbeat_to_all_broker(&self) -> bool {
1194 let heartbeat_data = self.prepare_heartbeat_data(false).await;
1195 let producer_empty = heartbeat_data.producer_data_set.is_empty();
1196 let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
1197 if producer_empty && consumer_empty {
1198 warn!(
1199 "sending heartbeat, but no consumer and no producer. [{}]",
1200 self.client_id
1201 );
1202 return false;
1203 }
1204 if self.broker_addr_table.is_empty() {
1205 return false;
1206 }
1207 for broker_entry in self.broker_addr_table.iter() {
1208 let (broker_name, broker_addrs) = broker_entry.pair();
1209 if broker_addrs.is_empty() {
1210 continue;
1211 }
1212 for (id, addr) in broker_addrs.iter() {
1213 if addr.is_empty() {
1214 continue;
1215 }
1216 if consumer_empty && *id != mix_all::MASTER_ID {
1217 continue;
1218 }
1219 self.send_heartbeat_to_broker_inner(*id, broker_name, addr, &heartbeat_data)
1220 .await;
1221 }
1222 }
1223
1224 true
1225 }
1226
1227 pub async fn send_heartbeat_to_broker(
1228 &self,
1229 id: u64,
1230 broker_name: &CheetahString,
1231 addr: &CheetahString,
1232 strict_lock_mode: bool,
1233 ) -> bool {
1234 let _guard = match self.lock_heartbeat.try_lock().await {
1235 Some(g) => g,
1236 None => {
1237 if strict_lock_mode {
1238 warn!("lock heartBeat, but failed. [{}]", self.client_id);
1239 }
1240 return false;
1241 }
1242 };
1243
1244 let heartbeat_data = self.prepare_heartbeat_data(false).await;
1245 let producer_empty = heartbeat_data.producer_data_set.is_empty();
1246 let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
1247 if producer_empty && consumer_empty {
1248 warn!(
1249 "sending heartbeat, but no consumer and no producer. [{}]",
1250 self.client_id
1251 );
1252 return false;
1253 }
1254
1255 if self.client_config.use_heartbeat_v2 {
1256 self.send_heartbeat_to_broker_v2(id, broker_name, addr, heartbeat_data)
1257 .await
1258 } else {
1259 let (result, _) = self
1260 .send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
1261 .await;
1262 result
1263 }
1264 }
1265
1266 async fn send_heartbeat_to_broker_v2(
1268 &self,
1269 id: u64,
1270 broker_name: &CheetahString,
1271 addr: &CheetahString,
1272 mut heartbeat_data_with_sub: HeartbeatData,
1273 ) -> bool {
1274 let current_fingerprint = heartbeat_data_with_sub.compute_heartbeat_fingerprint();
1276 heartbeat_data_with_sub.heartbeat_fingerprint = current_fingerprint;
1277
1278 let broker_support_v2 = self.broker_support_v2_heartbeat_set.contains_key(addr);
1280
1281 let last_fingerprint = self
1283 .broker_heartbeat_fingerprint_table
1284 .get(addr)
1285 .map(|entry| *entry.value());
1286
1287 let should_send_minimal =
1289 broker_support_v2 && last_fingerprint.is_some() && last_fingerprint.unwrap() == current_fingerprint;
1290
1291 let heartbeat_data = if should_send_minimal {
1292 let mut minimal = self.prepare_heartbeat_data(true).await;
1294 minimal.heartbeat_fingerprint = current_fingerprint;
1295 minimal
1296 } else {
1297 heartbeat_data_with_sub.clone()
1299 };
1300
1301 let (result, support_v2) = self
1303 .send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
1304 .await;
1305
1306 if result {
1307 if let Some(support_v2) = support_v2 {
1308 if support_v2 {
1309 self.broker_support_v2_heartbeat_set.insert(addr.clone(), ());
1311
1312 if !should_send_minimal {
1314 self.broker_heartbeat_fingerprint_table
1315 .insert(addr.clone(), current_fingerprint);
1316 }
1317 } else {
1318 self.broker_support_v2_heartbeat_set.remove(addr);
1320 self.broker_heartbeat_fingerprint_table.remove(addr);
1321
1322 warn!("Broker {} does not support HeartbeatV2, downgrading to V1", addr);
1323 }
1324 }
1325 }
1326
1327 result
1328 }
1329
1330 async fn send_heartbeat_to_broker_inner(
1331 &self,
1332 id: u64,
1333 broker_name: &CheetahString,
1334 addr: &CheetahString,
1335 heartbeat_data: &HeartbeatData,
1336 ) -> (bool, Option<bool>) {
1337 match self
1338 .mq_client_api_impl
1339 .as_ref()
1340 .unwrap()
1341 .mut_from_ref()
1342 .send_heartbeat(addr, heartbeat_data, self.client_config.mq_client_api_timeout)
1343 .await
1344 {
1345 Ok((version, response)) => {
1346 self.broker_version_table
1347 .entry(broker_name.clone())
1348 .or_default()
1349 .insert(addr.clone(), version);
1350
1351 let times = self
1352 .send_heartbeat_times_total
1353 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1354 if times % 20 == 0 {
1355 info!("send heart beat to broker[{} {} {}] success", broker_name, id, addr);
1356 }
1357
1358 let support_v2 = response.and_then(|resp| {
1360 resp.ext_fields().and_then(|fields| {
1361 fields
1362 .get(rocketmq_common::common::mix_all::IS_SUPPORT_HEART_BEAT_V2)
1363 .and_then(|v| v.parse::<bool>().ok())
1364 })
1365 });
1366
1367 (true, support_v2)
1368 }
1369 Err(_) => {
1370 if self.is_broker_in_name_server(addr).await {
1371 warn!("send heart beat to broker[{} {} {}] failed", broker_name, id, addr);
1372 } else {
1373 warn!(
1374 "send heart beat to broker[{} {} {}] exception, because the broker not up, forget it",
1375 broker_name, id, addr
1376 );
1377 }
1378 (false, None)
1379 }
1380 }
1381 }
1382
1383 async fn is_broker_in_name_server(&self, broker_name: &str) -> bool {
1384 for entry in self.topic_route_table.iter() {
1385 for bd in entry.value().broker_datas.iter() {
1386 for (_, value) in bd.broker_addrs().iter() {
1387 if value.as_str() == broker_name {
1388 return true;
1389 }
1390 }
1391 }
1392 }
1393 false
1394 }
1395
1396 async fn prepare_heartbeat_data(&self, is_without_sub: bool) -> HeartbeatData {
1397 let mut heartbeat_data = HeartbeatData {
1398 client_id: self.client_id.clone(),
1399 ..Default::default()
1400 };
1401
1402 for entry in self.consumer_table.iter() {
1403 let mut consumer_data = ConsumerData {
1404 group_name: entry.value().group_name(),
1405 consume_type: entry.value().consume_type(),
1406 message_model: entry.value().message_model(),
1407 consume_from_where: entry.value().consume_from_where(),
1408 subscription_data_set: entry.value().subscriptions(),
1409 unit_mode: entry.value().is_unit_mode(),
1410 };
1411 if !is_without_sub {
1412 entry.value().subscriptions().iter().for_each(|sub| {
1413 consumer_data.subscription_data_set.insert(sub.clone());
1414 });
1415 }
1416 heartbeat_data.consumer_data_set.insert(consumer_data);
1417 }
1418 for entry in self.producer_table.iter() {
1419 let producer_data = ProducerData {
1420 group_name: entry.key().clone(),
1421 };
1422 heartbeat_data.producer_data_set.insert(producer_data);
1423 }
1424 heartbeat_data.is_without_sub = is_without_sub;
1425 heartbeat_data
1426 }
1427
1428 pub async fn register_consumer(&mut self, group: &CheetahString, consumer: MQConsumerInnerImpl) -> bool {
1429 if self.consumer_table.contains_key(group) {
1430 warn!("the consumer group[{}] exist already.", group);
1431 return false;
1432 }
1433 self.consumer_table.insert(group.clone(), consumer);
1434 true
1435 }
1436
1437 pub async fn check_client_in_broker(&mut self) -> rocketmq_error::RocketMQResult<()> {
1438 for entry in self.consumer_table.iter() {
1439 let subscription_inner = entry.value().subscriptions();
1440 if subscription_inner.is_empty() {
1441 return Ok(());
1442 }
1443 for subscription_data in subscription_inner.iter() {
1444 if ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) {
1445 continue;
1446 }
1447 let addr = self.find_broker_addr_by_topic(subscription_data.topic.as_str()).await;
1448 if let Some(addr) = addr {
1449 match self
1450 .mq_client_api_impl
1451 .as_mut()
1452 .unwrap()
1453 .check_client_in_broker(
1454 addr.as_str(),
1455 entry.key().as_str(),
1456 self.client_id.as_str(),
1457 subscription_data,
1458 self.client_config.mq_client_api_timeout,
1459 )
1460 .await
1461 {
1462 Ok(_) => {}
1463 Err(e) => match e {
1464 rocketmq_error::RocketMQError::IllegalArgument(_) => {
1465 return Err(e);
1466 }
1467 _ => {
1468 let desc = format!(
1469 "Check client in broker error, maybe because you use {} to filter message, but \
1470 server has not been upgraded to support!This error would not affect the launch \
1471 of consumer, but may has impact on message receiving if you have use the new \
1472 features which are not supported by server, please check the log!",
1473 subscription_data.expression_type
1474 );
1475 }
1476 },
1477 }
1478 }
1479 }
1480 }
1481
1482 Ok(())
1483 }
1484
1485 pub async fn do_rebalance(&mut self) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
1486 let mut balanced = true;
1487 for entry in self.consumer_table.iter() {
1488 match entry.value().try_rebalance().await {
1489 Ok(result) => {
1490 if !result {
1491 balanced = false;
1492 }
1493 }
1494 Err(e) => {
1495 error!(
1496 "doRebalance for consumer group [{}] exception:{}",
1497 entry.key(),
1498 e.to_string()
1499 );
1500 balanced = false;
1501 }
1502 }
1503 }
1504 Ok(balanced)
1505 }
1506
1507 pub fn rebalance_later(&mut self, delay_millis: u64) {
1508 if delay_millis == 0 {
1509 self.rebalance_service.wakeup();
1510 } else {
1511 let service = self.rebalance_service.clone();
1512 tokio::spawn(async move {
1513 tokio::time::sleep(Duration::from_millis(delay_millis)).await;
1514 service.wakeup();
1515 });
1516 }
1517 }
1518
1519 pub async fn find_broker_address_in_subscribe(
1520 &mut self,
1521 broker_name: &CheetahString,
1522 broker_id: u64,
1523 only_this_broker: bool,
1524 ) -> Option<FindBrokerResult> {
1525 if broker_name.is_empty() {
1526 return None;
1527 }
1528
1529 let mut broker_addr: Option<CheetahString> = None;
1530 let mut slave = false;
1531 let mut found = false;
1532
1533 if let Some(map_ref) = self.broker_addr_table.get(broker_name) {
1534 let map = map_ref.value();
1535 if let Some(addr) = map.get(&broker_id) {
1536 broker_addr = Some(addr.clone());
1537 slave = broker_id != mix_all::MASTER_ID;
1538 found = true;
1539 } else if !found && broker_id != mix_all::MASTER_ID {
1540 if let Some(addr) = map.get(&(broker_id + 1)) {
1541 broker_addr = Some(addr.clone());
1542 found = true;
1543 }
1544 }
1545 if !found && !only_this_broker {
1546 if let Some((key, value)) = map.iter().next() {
1547 broker_addr = Some(value.clone());
1548 slave = *key != mix_all::MASTER_ID;
1549 found = !value.is_empty();
1550 }
1551 }
1552 }
1553
1554 if found {
1555 let broker_addr = broker_addr?;
1556 let broker_version = self.find_broker_version(broker_name, broker_addr.as_str()).await;
1557 Some(FindBrokerResult {
1558 broker_addr,
1559 slave,
1560 broker_version,
1561 })
1562 } else {
1563 None
1564 }
1565 }
1566 async fn find_broker_version(&self, broker_name: &str, broker_addr: &str) -> i32 {
1567 if let Some(map) = self.broker_version_table.get(broker_name) {
1568 if let Some(version) = map.value().get(broker_addr) {
1569 return *version;
1570 }
1571 }
1572 0
1573 }
1574
1575 pub async fn select_consumer(&self, group: &str) -> Option<MQConsumerInnerImpl> {
1576 self.consumer_table.get(group).map(|entry| entry.value().clone())
1577 }
1578
1579 pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
1580 self.producer_table.get(group).map(|entry| entry.value().clone())
1581 }
1582
1583 pub async fn unregister_consumer(&mut self, group: impl Into<CheetahString>) -> bool {
1584 let group = group.into();
1585 if group.is_empty() {
1586 warn!("unregister_consumer: group name is empty");
1587 return false;
1588 }
1589
1590 let removed = self.consumer_table.remove(&group).is_some();
1591
1592 if removed {
1593 info!("unregister consumer [{}] OK", group);
1594 self.unregister_client(None, Some(group)).await;
1595 true
1596 } else {
1597 warn!("unregister consumer [{}] failed: not found in consumer table", group);
1598 false
1599 }
1600 }
1601
1602 pub async fn unregister_producer(&mut self, group: impl Into<CheetahString>) -> bool {
1603 let group = group.into();
1604 if group.is_empty() {
1605 warn!("unregister_producer: group name is empty");
1606 return false;
1607 }
1608
1609 let removed = self.producer_table.remove(&group).is_some();
1610
1611 if removed {
1612 info!("unregister producer [{}] OK", group);
1613 self.unregister_client(Some(group), None).await;
1614 true
1615 } else {
1616 warn!("unregister producer [{}] failed: not found in producer table", group);
1617 false
1618 }
1619 }
1620
1621 pub async fn unregister_admin_ext(&mut self, group: impl Into<CheetahString>) {
1622 let _ = self.admin_ext_table.remove(&group.into());
1623 }
1624 async fn unregister_client(
1625 &mut self,
1626 producer_group: Option<CheetahString>,
1627 consumer_group: Option<CheetahString>,
1628 ) {
1629 for broker_entry in self.broker_addr_table.iter() {
1630 let (broker_name, broker_addrs) = broker_entry.pair();
1631 for (id, addr) in broker_addrs.iter() {
1632 if let Err(err) = self
1633 .mq_client_api_impl
1634 .as_mut()
1635 .unwrap()
1636 .unregister_client(
1637 addr,
1638 self.client_id.clone(),
1639 producer_group.clone(),
1640 consumer_group.clone(),
1641 self.client_config.mq_client_api_timeout,
1642 )
1643 .await
1644 {
1645 } else {
1646 info!(
1647 "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] success",
1648 producer_group, consumer_group, broker_name, id, addr,
1649 );
1650 }
1651 }
1652 }
1653 }
1654
1655 async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
1656 for entry in self.topic_route_table.iter() {
1657 for bd in entry.value().broker_datas.iter() {
1658 for (_, value) in bd.broker_addrs().iter() {
1659 if value.as_str() == addr {
1660 return true;
1661 }
1662 }
1663 }
1664 }
1665 false
1666 }
1667
1668 pub async fn query_assignment(
1688 &mut self,
1689 topic: &CheetahString,
1690 consumer_group: &CheetahString,
1691 strategy_name: &CheetahString,
1692 message_model: MessageModel,
1693 timeout: u64,
1694 ) -> rocketmq_error::RocketMQResult<Option<HashSet<MessageQueueAssignment>>> {
1695 let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
1697
1698 if broker_addr.is_none() {
1700 self.update_topic_route_info_from_name_server_topic(topic).await;
1701 broker_addr = self.find_broker_addr_by_topic(topic).await;
1702 }
1703 if let Some(broker_addr) = broker_addr {
1704 let client_id = self.client_id.clone();
1705 match self.mq_client_api_impl.as_mut() {
1706 Some(api_impl) => {
1707 api_impl
1708 .query_assignment(
1709 &broker_addr,
1710 topic.clone(),
1711 consumer_group.clone(),
1712 client_id,
1713 strategy_name.clone(),
1714 message_model,
1715 timeout,
1716 )
1717 .await
1718 }
1719 None => Err(mq_client_err!("mq_client_api_impl is None")),
1720 }
1721 } else {
1722 Ok(None)
1723 }
1724 }
1725
1726 pub async fn consume_message_directly(
1727 &self,
1728 message: MessageExt,
1729 consumer_group: &CheetahString,
1730 broker_name: Option<CheetahString>,
1731 ) -> Option<ConsumeMessageDirectlyResult> {
1732 let consumer_inner = self.consumer_table.get(consumer_group);
1733 if let Some(entry) = consumer_inner {
1734 return entry.value().consume_message_directly(message, broker_name).await;
1735 }
1736
1737 None
1738 }
1739}
1740
1741#[allow(clippy::unnecessary_unwrap)]
1742pub fn topic_route_data2topic_publish_info(topic: &str, route: &mut TopicRouteData) -> TopicPublishInfo {
1743 let mut info = TopicPublishInfo {
1744 topic_route_data: Some(route.clone()),
1745 ..Default::default()
1746 };
1747 if route.order_topic_conf.is_some() && !route.order_topic_conf.as_ref().unwrap().is_empty() {
1748 let brokers = route
1749 .order_topic_conf
1750 .as_ref()
1751 .unwrap()
1752 .split(";")
1753 .map(|s| s.to_string())
1754 .collect::<Vec<String>>();
1755 for broker in brokers {
1756 let item = broker.split(":").collect::<Vec<&str>>();
1757 if item.len() == 2 {
1758 let queue_num = item[1].parse::<i32>().unwrap();
1759 for i in 0..queue_num {
1760 let mq = MessageQueue::from_parts(topic, item[0], i);
1761 info.message_queue_list.push(mq);
1762 }
1763 }
1764 }
1765 info.order_topic = true;
1766 } else if route.order_topic_conf.is_none()
1767 && route.topic_queue_mapping_by_broker.is_some()
1768 && !route.topic_queue_mapping_by_broker.as_ref().unwrap().is_empty()
1769 {
1770 info.order_topic = false;
1771 let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1772 if let Some(mq_end_points) = mq_end_points {
1773 for (mq, broker_name) in mq_end_points {
1774 info.message_queue_list.push(mq);
1775 }
1776 }
1777 info.message_queue_list
1778 .sort_by(|a, b| match a.queue_id().cmp(&b.queue_id()) {
1779 Ordering::Less => std::cmp::Ordering::Less,
1780 Ordering::Equal => std::cmp::Ordering::Equal,
1781 Ordering::Greater => std::cmp::Ordering::Greater,
1782 });
1783 } else {
1784 route.queue_datas.sort();
1785 for queue_data in route.queue_datas.iter() {
1786 if PermName::is_writeable(queue_data.perm) {
1787 let mut broker_data = None;
1788 for bd in route.broker_datas.iter() {
1789 if bd.broker_name() == queue_data.broker_name.as_str() {
1790 broker_data = Some(bd.clone());
1791 break;
1792 }
1793 }
1794 if broker_data.is_none() {
1795 continue;
1796 }
1797 if !broker_data
1798 .as_ref()
1799 .unwrap()
1800 .broker_addrs()
1801 .contains_key(&(mix_all::MASTER_ID))
1802 {
1803 continue;
1804 }
1805 for i in 0..queue_data.write_queue_nums {
1806 let mq = MessageQueue::from_parts(topic, queue_data.broker_name.as_str(), i as i32);
1807 info.message_queue_list.push(mq);
1808 }
1809 }
1810 }
1811 }
1812 info
1813}
1814
1815pub fn topic_route_data2topic_subscribe_info(topic: &str, route: &TopicRouteData) -> HashSet<MessageQueue> {
1816 if let Some(ref topic_queue_mapping_by_broker) = route.topic_queue_mapping_by_broker {
1817 if !topic_queue_mapping_by_broker.is_empty() {
1818 let mq_endpoints = ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1819 return mq_endpoints
1820 .unwrap_or_default()
1821 .keys()
1822 .cloned()
1823 .collect::<HashSet<MessageQueue>>();
1824 }
1825 }
1826 let mut mq_list = HashSet::new();
1827 for qd in &route.queue_datas {
1828 if PermName::is_readable(qd.perm) {
1829 for i in 0..qd.read_queue_nums {
1830 let mq = MessageQueue::from_parts(topic, qd.broker_name.as_str(), i as i32);
1831 mq_list.insert(mq);
1832 }
1833 }
1834 }
1835 mq_list
1836}