1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::sync::atomic::AtomicI64;
21use std::sync::Arc;
22use std::thread;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26use rand::seq::IndexedRandom;
27use rocketmq_common::common::base::service_state::ServiceState;
28use rocketmq_common::common::constant::PermName;
29use rocketmq_common::common::filter::expression_type::ExpressionType;
30use rocketmq_common::common::message::message_ext::MessageExt;
31use rocketmq_common::common::message::message_queue::MessageQueue;
32use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
33use rocketmq_common::common::mix_all;
34use rocketmq_common::TimeUtils::get_current_millis;
35use rocketmq_error::mq_client_err;
36use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
37use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
38use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
39use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
40use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
41use rocketmq_remoting::protocol::heartbeat::producer_data::ProducerData;
42use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
43use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
44use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
45use rocketmq_remoting::runtime::RPCHook;
46use rocketmq_runtime::RocketMQRuntime;
47use rocketmq_rust::ArcMut;
48use rocketmq_rust::RocketMQTokioMutex;
49use tokio::runtime::Handle;
50use tokio::sync::RwLock;
51use tracing::error;
52use tracing::info;
53use tracing::warn;
54
55use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
56use crate::base::client_config::ClientConfig;
57use crate::consumer::consumer_impl::pull_message_service::PullMessageService;
58use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceService;
59use crate::consumer::mq_consumer_inner::MQConsumerInner;
60use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
61use crate::implementation::client_remoting_processor::ClientRemotingProcessor;
62use crate::implementation::find_broker_result::FindBrokerResult;
63use crate::implementation::mq_admin_impl::MQAdminImpl;
64use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
65use crate::producer::default_mq_producer::DefaultMQProducer;
66use crate::producer::default_mq_producer::ProducerConfig;
67use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
68use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
69
70const LOCK_TIMEOUT_MILLIS: u64 = 3000;
71
72pub struct MQClientInstance {
73 pub(crate) client_config: ArcMut<ClientConfig>,
74 pub(crate) client_id: CheetahString,
75 boot_timestamp: u64,
76 producer_table: Arc<RwLock<HashMap<CheetahString, MQProducerInnerImpl>>>,
81 consumer_table: Arc<RwLock<HashMap<CheetahString, MQConsumerInnerImpl>>>,
86 admin_ext_table: Arc<RwLock<HashMap<CheetahString, MQAdminExtInnerImpl>>>,
91 pub(crate) mq_client_api_impl: Option<ArcMut<MQClientAPIImpl>>,
92 pub(crate) mq_admin_impl: ArcMut<MQAdminImpl>,
93 pub(crate) topic_route_table: Arc<RwLock<HashMap<CheetahString , TopicRouteData>>>,
94 topic_end_points_table: Arc<
95 RwLock<
96 HashMap<
97 CheetahString, HashMap<MessageQueue, CheetahString >,
99 >,
100 >,
101 >,
102 lock_namesrv: Arc<RocketMQTokioMutex<()>>,
103 lock_heartbeat: Arc<RocketMQTokioMutex<()>>,
104
105 service_state: ServiceState,
106 pub(crate) pull_message_service: ArcMut<PullMessageService>,
107 rebalance_service: RebalanceService,
108 pub(crate) default_producer: ArcMut<DefaultMQProducer>,
109 instance_runtime: Arc<RocketMQRuntime>,
110 broker_addr_table: Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>>,
111 broker_version_table: Arc<
112 RwLock<
113 HashMap<
114 CheetahString, HashMap<CheetahString , i32>,
116 >,
117 >,
118 >,
119 send_heartbeat_times_total: Arc<AtomicI64>,
120}
121
122impl MQClientInstance {
123 pub fn new(
124 client_config: ClientConfig,
125 instance_index: i32,
126 client_id: String,
127 rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
128 ) -> Self {
129 unimplemented!()
186 }
187
188 pub fn new_arc(
189 client_config: ClientConfig,
190 instance_index: i32,
191 client_id: impl Into<CheetahString>,
192 rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
193 ) -> ArcMut<MQClientInstance> {
194 let broker_addr_table = Arc::new(Default::default());
195 let mut instance = ArcMut::new(MQClientInstance {
196 client_config: ArcMut::new(client_config.clone()),
197 client_id: client_id.into(),
198 boot_timestamp: get_current_millis(),
199 producer_table: Arc::new(RwLock::new(HashMap::new())),
200 consumer_table: Arc::new(Default::default()),
201 admin_ext_table: Arc::new(Default::default()),
202 mq_client_api_impl: None,
203 mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
204 topic_route_table: Arc::new(Default::default()),
205 topic_end_points_table: Arc::new(Default::default()),
206 lock_namesrv: Default::default(),
207 lock_heartbeat: Default::default(),
208 service_state: ServiceState::CreateJust,
209 pull_message_service: ArcMut::new(PullMessageService::new()),
210 rebalance_service: RebalanceService::new(),
211 default_producer: ArcMut::new(
212 DefaultMQProducer::builder()
213 .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
214 .client_config(client_config.clone())
215 .build(),
216 ),
217 instance_runtime: Arc::new(RocketMQRuntime::new_multi(
218 num_cpus::get(),
219 "mq-client-instance",
220 )),
221 broker_addr_table,
222 broker_version_table: Arc::new(Default::default()),
223 send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
224 });
225 let instance_clone = instance.clone();
226 instance.mq_admin_impl.set_client(instance_clone);
227 let weak_instance = ArcMut::downgrade(&instance);
228 let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
229
230 let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
231 Arc::new(TokioClientConfig::default()),
232 ClientRemotingProcessor::new(instance.clone()),
233 rpc_hook,
234 client_config.clone(),
235 Some(tx),
236 ));
237 instance.mq_client_api_impl = Some(mq_client_api_impl.clone());
238 if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
239 let handle = Handle::current();
240
241 let namesrv_addr = namesrv_addr.to_string();
242 thread::spawn(move || {
243 handle.block_on(async move {
244 mq_client_api_impl
245 .update_name_server_address_list(namesrv_addr.as_str())
246 .await;
247 })
248 });
249 }
250 tokio::spawn(async move {
251 while let Ok(value) = rx.recv().await {
252 if let Some(instance_) = weak_instance.upgrade() {
253 match value {
254 ConnectionNetEvent::CONNECTED(remote_address) => {
255 info!("ConnectionNetEvent CONNECTED");
256 let broker_addr_table = instance_.broker_addr_table.read().await;
257 for (broker_name, broker_addrs) in broker_addr_table.iter() {
258 for (id, addr) in broker_addrs.iter() {
259 if addr == remote_address.to_string().as_str()
260 && instance_
261 .send_heartbeat_to_broker(*id, broker_name, addr)
262 .await
263 {
264 instance_.re_balance_immediately();
265 }
266 }
267 }
268 }
269 ConnectionNetEvent::DISCONNECTED => {}
270 ConnectionNetEvent::EXCEPTION => {}
271 }
272 }
273 }
274 warn!("ConnectionNetEvent recv error");
275 });
276 instance
277 }
278
279 pub fn re_balance_immediately(&self) {
280 self.rebalance_service.wakeup();
281 }
282
283 pub fn re_balance_later(&self, delay_millis: Duration) {
284 if delay_millis <= Duration::from_millis(0) {
285 self.rebalance_service.wakeup();
286 } else {
287 let service = self.rebalance_service.clone();
288 tokio::spawn(async move {
289 tokio::time::sleep(delay_millis).await;
290 service.wakeup();
291 });
292 }
293 }
294
295 pub async fn start(&mut self, this: ArcMut<Self>) -> rocketmq_error::RocketMQResult<()> {
296 match self.service_state {
297 ServiceState::CreateJust => {
298 self.service_state = ServiceState::StartFailed;
299 if self.client_config.namesrv_addr.is_none() {
301 self.mq_client_api_impl
302 .as_mut()
303 .expect("mq_client_api_impl is None")
304 .fetch_name_server_addr()
305 .await;
306 }
307 self.mq_client_api_impl
309 .as_mut()
310 .expect("mq_client_api_impl is None")
311 .start()
312 .await;
313 self.start_scheduled_task(this.clone());
315 let instance = this.clone();
317 self.pull_message_service.start(instance).await;
318 self.rebalance_service.start(this).await;
320 self.default_producer
323 .default_mqproducer_impl
324 .as_mut()
325 .unwrap()
326 .start_with_factory(false)
327 .await?;
328 info!("the client factory[{}] start OK", self.client_id);
329 self.service_state = ServiceState::Running;
330 }
331 ServiceState::Running => {}
332 ServiceState::ShutdownAlready => {}
333 ServiceState::StartFailed => {
334 return mq_client_err!(format!(
335 "The Factory object[{}] has been created before, and failed.",
336 self.client_id
337 ));
338 }
339 }
340 Ok(())
341 }
342
343 pub async fn shutdown(&mut self) {}
344
345 pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
346 if group.is_empty() {
347 return false;
348 }
349 let mut producer_table = self.producer_table.write().await;
350 if producer_table.contains_key(group) {
351 warn!("the producer group[{}] exist already.", group);
352 return false;
353 }
354 producer_table.insert(group.into(), producer);
355 true
356 }
357
358 pub async fn register_admin_ext(&mut self, group: &str, admin: MQAdminExtInnerImpl) -> bool {
359 if group.is_empty() {
360 return false;
361 }
362 let mut admin_ext_table = self.admin_ext_table.write().await;
363 if admin_ext_table.contains_key(group) {
364 warn!("the admin group[{}] exist already.", group);
365 return false;
366 }
367 admin_ext_table.insert(group.into(), admin);
368 true
369 }
370
371 fn start_scheduled_task(&mut self, this: ArcMut<Self>) {
372 if self.client_config.namesrv_addr.is_none() {
373 let mut mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
375 self.instance_runtime.get_handle().spawn(async move {
376 info!("ScheduledTask fetchNameServerAddr started");
377 tokio::time::sleep(Duration::from_secs(10)).await;
378 loop {
379 let current_execution_time = tokio::time::Instant::now();
380 mq_client_api_impl.fetch_name_server_addr().await;
381 let next_execution_time = current_execution_time + Duration::from_secs(120);
382 let delay =
383 next_execution_time.saturating_duration_since(tokio::time::Instant::now());
384 tokio::time::sleep(delay).await;
385 }
386 });
387 }
388
389 let mut client_instance = this.clone();
391 let poll_name_server_interval = self.client_config.poll_name_server_interval;
392 self.instance_runtime.get_handle().spawn(async move {
393 info!("ScheduledTask update_topic_route_info_from_name_server started");
394 tokio::time::sleep(Duration::from_millis(10)).await;
395 loop {
396 let current_execution_time = tokio::time::Instant::now();
397 client_instance
398 .update_topic_route_info_from_name_server()
399 .await;
400 let next_execution_time = current_execution_time
401 + Duration::from_millis(poll_name_server_interval as u64);
402 let delay =
403 next_execution_time.saturating_duration_since(tokio::time::Instant::now());
404 tokio::time::sleep(delay).await;
405 }
406 });
407
408 let mut client_instance = this.clone();
410 let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
411 self.instance_runtime.get_handle().spawn(async move {
412 info!("ScheduledTask clean_offline_broker started");
413 tokio::time::sleep(Duration::from_secs(1)).await;
414 loop {
415 let current_execution_time = tokio::time::Instant::now();
416 client_instance.clean_offline_broker().await;
417 client_instance
418 .send_heartbeat_to_all_broker_with_lock()
419 .await;
420 let next_execution_time = current_execution_time
421 + Duration::from_millis(heartbeat_broker_interval as u64);
422 let delay =
423 next_execution_time.saturating_duration_since(tokio::time::Instant::now());
424 tokio::time::sleep(delay).await;
425 }
426 });
427
428 let mut client_instance = this;
430 let persist_consumer_offset_interval =
431 self.client_config.persist_consumer_offset_interval as u64;
432 self.instance_runtime.get_handle().spawn(async move {
433 info!("ScheduledTask persistAllConsumerOffset started");
434 tokio::time::sleep(Duration::from_secs(10)).await;
435 loop {
436 let current_execution_time = tokio::time::Instant::now();
437 client_instance.persist_all_consumer_offset().await;
438 let next_execution_time = current_execution_time
439 + Duration::from_millis(persist_consumer_offset_interval);
440 let delay =
441 next_execution_time.saturating_duration_since(tokio::time::Instant::now());
442 tokio::time::sleep(delay).await;
443 }
444 });
445 }
446
447 pub async fn update_topic_route_info_from_name_server(&mut self) {
448 let mut topic_list = HashSet::new();
449
450 {
451 let producer_table = self.producer_table.read().await;
452 for (_, value) in producer_table.iter() {
453 topic_list.extend(value.get_publish_topic_list());
454 }
455 }
456
457 {
458 let consumer_table = self.consumer_table.read().await;
459 for (_, value) in consumer_table.iter() {
460 value.subscriptions().iter().for_each(|sub| {
461 topic_list.insert(sub.topic.clone());
462 });
463 }
464 }
465
466 for topic in topic_list.iter() {
467 self.update_topic_route_info_from_name_server_topic(topic)
468 .await;
469 }
470 }
471
472 #[inline]
473 pub async fn update_topic_route_info_from_name_server_topic(
474 &mut self,
475 topic: &CheetahString,
476 ) -> bool {
477 self.update_topic_route_info_from_name_server_default(topic, false, None)
478 .await
479 }
480
481 pub async fn find_consumer_id_list(
482 &mut self,
483 topic: &CheetahString,
484 group: &CheetahString,
485 ) -> Option<Vec<CheetahString>> {
486 let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
487 if broker_addr.is_none() {
488 self.update_topic_route_info_from_name_server_topic(topic)
489 .await;
490 broker_addr = self.find_broker_addr_by_topic(topic).await;
491 }
492 if let Some(broker_addr) = broker_addr {
493 match self
494 .mq_client_api_impl
495 .as_mut()
496 .unwrap()
497 .get_consumer_id_list_by_group(
498 broker_addr.as_str(),
499 group,
500 self.client_config.mq_client_api_timeout,
501 )
502 .await
503 {
504 Ok(value) => return Some(value),
505 Err(e) => {
506 warn!(
507 "getConsumerIdListByGroup exception,{} {}, err:{}",
508 broker_addr,
509 group,
510 e.to_string()
511 );
512 }
513 }
514 }
515 None
516 }
517
518 pub async fn find_broker_addr_by_topic(&self, topic: &str) -> Option<CheetahString> {
519 let topic_route_table = self.topic_route_table.read().await;
520 if let Some(topic_route_data) = topic_route_table.get(topic) {
521 let brokers = &topic_route_data.broker_datas;
522 if !brokers.is_empty() {
523 let bd = brokers.choose(&mut rand::rng());
524 if let Some(bd) = bd {
525 return bd.select_broker_addr();
526 }
527 }
528 }
529 None
530 }
531
532 pub async fn update_topic_route_info_from_name_server_default(
533 &mut self,
534 topic: &CheetahString,
535 is_default: bool,
536 producer_config: Option<&Arc<ProducerConfig>>,
537 ) -> bool {
538 let lock = self.lock_namesrv.lock().await;
539 let topic_route_data = if let (true, Some(producer_config)) = (is_default, producer_config)
540 {
541 let mut result = match self
542 .mq_client_api_impl
543 .as_mut()
544 .unwrap()
545 .get_default_topic_route_info_from_name_server(
546 self.client_config.mq_client_api_timeout,
547 )
548 .await
549 {
550 Ok(value) => value,
551 Err(e) => {
552 error!(
553 "get_default_topic_route_info_from_name_server failed, topic: {}, error: \
554 {}",
555 topic, e
556 );
557 None
558 }
559 };
560 if let Some(topic_route_data) = result.as_mut() {
561 for data in topic_route_data.queue_datas.iter_mut() {
562 let queue_nums = producer_config
563 .default_topic_queue_nums()
564 .max(data.read_queue_nums);
565 data.read_queue_nums = queue_nums;
566 data.write_queue_nums = queue_nums;
567 }
568 }
569 result
570 } else {
571 self.mq_client_api_impl
572 .as_mut()
573 .unwrap()
574 .get_topic_route_info_from_name_server(
575 topic,
576 self.client_config.mq_client_api_timeout,
577 )
578 .await
579 .unwrap_or(None)
580 };
581 if let Some(mut topic_route_data) = topic_route_data {
582 let mut topic_route_table = self.topic_route_table.write().await;
583 let old = topic_route_table.get(topic);
584 let mut changed = topic_route_data.topic_route_data_changed(old);
585 if !changed {
586 changed = self.is_need_update_topic_route_info(topic).await;
587 } else {
588 info!(
589 "the topic[{}] route info changed, old[{:?}] ,new[{:?}]",
590 topic, old, topic_route_data
591 )
592 }
593 if changed {
594 let mut broker_addr_table = self.broker_addr_table.write().await;
595 for bd in topic_route_data.broker_datas.iter() {
596 broker_addr_table.insert(bd.broker_name().clone(), bd.broker_addrs().clone());
597 }
598 drop(broker_addr_table);
599
600 {
602 let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(
603 topic,
604 &topic_route_data,
605 );
606 if let Some(mq_end_points) = mq_end_points {
607 if !mq_end_points.is_empty() {
608 let mut topic_end_points_table =
609 self.topic_end_points_table.write().await;
610 topic_end_points_table.insert(topic.into(), mq_end_points);
611 }
612 }
613 }
614
615 {
617 let mut publish_info =
618 topic_route_data2topic_publish_info(topic, &mut topic_route_data);
619 publish_info.have_topic_router_info = true;
620 let mut producer_table = self.producer_table.write().await;
621 for (_, value) in producer_table.iter_mut() {
622 value.update_topic_publish_info(
623 topic.to_string(),
624 Some(publish_info.clone()),
625 );
626 }
627 }
628
629 {
631 let consumer_table = self.consumer_table.read().await;
632 if !consumer_table.is_empty() {
633 let subscribe_info =
634 topic_route_data2topic_subscribe_info(topic, &topic_route_data);
635 for (_, value) in consumer_table.iter() {
636 value
637 .update_topic_subscribe_info(topic.clone(), &subscribe_info)
638 .await;
639 }
640 }
641 }
642 let clone_topic_route_data = TopicRouteData::from_existing(&topic_route_data);
643 topic_route_table.insert(topic.clone(), clone_topic_route_data);
644 return true;
645 }
646 } else {
647 warn!(
648 "updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, \
649 Topic: {}. [{}]",
650 topic, self.client_id
651 );
652 }
653
654 drop(lock);
655 false
656 }
657
658 async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool {
659 let mut result = false;
660 let producer_table = self.producer_table.read().await;
661 for (key, value) in producer_table.iter() {
662 if !result {
663 result = value.is_publish_topic_need_update(topic);
664 break;
665 }
666 }
667 if result {
668 return true;
669 }
670
671 let consumer_table = self.consumer_table.read().await;
672 for (key, value) in consumer_table.iter() {
673 if !result {
674 result = value.is_subscribe_topic_need_update(topic).await;
675 break;
676 }
677 }
678 result
679 }
680
681 pub async fn persist_all_consumer_offset(&mut self) {
682 let consumer_table = self.consumer_table.read().await;
683 for (_, value) in consumer_table.iter() {
684 value.persist_consumer_offset().await;
685 }
686 }
687
688 pub async fn clean_offline_broker(&mut self) {
689 let lock = self
690 .lock_namesrv
691 .try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
692 .await;
693 if let Some(lock) = lock {
694 let mut broker_addr_table = self.broker_addr_table.write().await;
695 let mut updated_table = HashMap::with_capacity(broker_addr_table.len());
696 let mut broker_name_set = HashSet::new();
697 for (broker_name, one_table) in broker_addr_table.iter() {
698 let mut clone_addr_table = one_table.clone();
699 let mut remove_id_set = HashSet::new();
700 for (id, addr) in one_table.iter() {
701 if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
702 remove_id_set.insert(*id);
703 }
704 }
705 clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
706 if clone_addr_table.is_empty() {
707 info!(
708 "the broker[{}] name's host is offline, remove it",
709 broker_name
710 );
711 broker_name_set.insert(broker_name.clone());
712 } else {
713 updated_table.insert(broker_name.clone(), clone_addr_table);
714 }
715 }
716 broker_addr_table.retain(|k, _| !broker_name_set.contains(k));
717 if !updated_table.is_empty() {
718 broker_addr_table.extend(updated_table);
719 }
720 }
721 }
722 pub async fn send_heartbeat_to_all_broker_with_lock(&mut self) -> bool {
723 if let Some(lock) = self.lock_heartbeat.try_lock().await {
724 if self.client_config.use_heartbeat_v2 {
725 self.send_heartbeat_to_all_broker_v2(false).await
726 } else {
727 self.send_heartbeat_to_all_broker().await
728 }
729 } else {
730 warn!("lock heartBeat, but failed. [{}]", self.client_id);
731 false
732 }
733 }
734
735 pub async fn send_heartbeat_to_all_broker_with_lock_v2(&mut self, is_rebalance: bool) -> bool {
736 if let Some(lock) = self
737 .lock_heartbeat
738 .try_lock_timeout(Duration::from_secs(2))
739 .await
740 {
741 if self.client_config.use_heartbeat_v2 {
742 self.send_heartbeat_to_all_broker_v2(is_rebalance).await
743 } else {
744 self.send_heartbeat_to_all_broker().await
745 }
746 } else {
747 warn!("lock heartBeat, but failed. [{}]", self.client_id);
748 false
749 }
750 }
751
752 pub fn get_mq_client_api_impl(&self) -> ArcMut<MQClientAPIImpl> {
753 self.mq_client_api_impl.as_ref().unwrap().clone()
754 }
755
756 pub async fn get_broker_name_from_message_queue(
757 &self,
758 message_queue: &MessageQueue,
759 ) -> CheetahString {
760 let guard = self.topic_end_points_table.read().await;
761 if let Some(broker_name) = guard.get(message_queue.get_topic()) {
762 if let Some(addr) = broker_name.get(message_queue) {
763 return addr.clone();
764 }
765 }
766 message_queue.get_broker_name().clone()
767 }
768
769 pub async fn find_broker_address_in_publish(
770 &self,
771 broker_name: &CheetahString,
772 ) -> Option<CheetahString> {
773 if broker_name.is_empty() {
774 return None;
775 }
776 let guard = self.broker_addr_table.read().await;
777 let map = guard.get(broker_name);
778 if let Some(map) = map {
779 return map.get(&(mix_all::MASTER_ID)).cloned();
780 }
781 None
782 }
783
784 async fn send_heartbeat_to_all_broker_v2(&self, is_rebalance: bool) -> bool {
785 unimplemented!()
786 }
787
788 async fn send_heartbeat_to_all_broker(&self) -> bool {
789 let heartbeat_data = self.prepare_heartbeat_data(false).await;
790 let producer_empty = heartbeat_data.producer_data_set.is_empty();
791 let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
792 if producer_empty && consumer_empty {
793 warn!(
794 "sending heartbeat, but no consumer and no producer. [{}]",
795 self.client_id
796 );
797 return false;
798 }
799 let broker_addr_table = self.broker_addr_table.read().await;
800 if broker_addr_table.is_empty() {
801 return false;
802 }
803 for (broker_name, broker_addrs) in broker_addr_table.iter() {
804 if broker_addrs.is_empty() {
805 continue;
806 }
807 for (id, addr) in broker_addrs.iter() {
808 if addr.is_empty() {
809 continue;
810 }
811 if consumer_empty && *id != mix_all::MASTER_ID {
812 continue;
813 }
814 self.send_heartbeat_to_broker_inner(*id, broker_name, addr, &heartbeat_data)
815 .await;
816 }
817 }
818
819 true
820 }
821
822 pub async fn send_heartbeat_to_broker(
823 &self,
824 id: u64,
825 broker_name: &CheetahString,
826 addr: &CheetahString,
827 ) -> bool {
828 if let Some(lock) = self.lock_heartbeat.try_lock().await {
829 let heartbeat_data = self.prepare_heartbeat_data(false).await;
830 let producer_empty = heartbeat_data.producer_data_set.is_empty();
831 let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
832 if producer_empty && consumer_empty {
833 warn!(
834 "sending heartbeat, but no consumer and no producer. [{}]",
835 self.client_id
836 );
837 return false;
838 }
839
840 if self.client_config.use_heartbeat_v2 {
841 unimplemented!("sendHeartbeatToBrokerV2")
842 } else {
843 self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
844 .await
845 }
846 } else {
847 false
848 }
849 }
850
851 async fn send_heartbeat_to_broker_inner(
852 &self,
853 id: u64,
854 broker_name: &CheetahString,
855 addr: &CheetahString,
856 heartbeat_data: &HeartbeatData,
857 ) -> bool {
858 if let Ok(version) = self
859 .mq_client_api_impl
860 .as_ref()
861 .unwrap()
862 .mut_from_ref()
863 .send_heartbeat(
864 addr,
865 heartbeat_data,
866 self.client_config.mq_client_api_timeout,
867 )
868 .await
869 {
870 let mut broker_version_table = self.broker_version_table.write().await;
871 let map = broker_version_table.get_mut(broker_name);
872 if let Some(map) = map {
873 map.insert(addr.clone(), version);
874 } else {
875 let mut map = HashMap::new();
876 map.insert(addr.clone(), version);
877 broker_version_table.insert(broker_name.clone(), map);
878 }
879
880 let times = self
881 .send_heartbeat_times_total
882 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
883 if times % 20 == 0 {
884 info!(
885 "send heart beat to broker[{} {} {}] success",
886 broker_name, id, addr,
887 );
888 }
889 return true;
890 }
891 if self.is_broker_in_name_server(addr).await {
892 warn!(
893 "send heart beat to broker[{} {} {}] failed",
894 broker_name, id, addr
895 );
896 } else {
897 warn!(
898 "send heart beat to broker[{} {} {}] exception, because the broker not up, forget \
899 it",
900 broker_name, id, addr
901 )
902 }
903 false
904 }
905
906 async fn is_broker_in_name_server(&self, broker_name: &str) -> bool {
907 let broker_addr_table = self.topic_route_table.read().await;
908 for (_, value) in broker_addr_table.iter() {
909 for bd in value.broker_datas.iter() {
910 for (_, value) in bd.broker_addrs().iter() {
911 if value.as_str() == broker_name {
912 return true;
913 }
914 }
915 }
916 }
917 false
918 }
919
920 async fn prepare_heartbeat_data(&self, is_without_sub: bool) -> HeartbeatData {
921 let mut heartbeat_data = HeartbeatData {
922 client_id: self.client_id.clone(),
923 ..Default::default()
924 };
925
926 let consumer_table = self.consumer_table.read().await;
927 for (_, value) in consumer_table.iter() {
928 let mut consumer_data = ConsumerData {
929 group_name: value.group_name(),
930 consume_type: value.consume_type(),
931 message_model: value.message_model(),
932 consume_from_where: value.consume_from_where(),
933 subscription_data_set: value.subscriptions(),
934 unit_mode: value.is_unit_mode(),
935 };
936 if !is_without_sub {
937 value.subscriptions().iter().for_each(|sub| {
938 consumer_data.subscription_data_set.insert(sub.clone());
939 });
940 }
941 heartbeat_data.consumer_data_set.insert(consumer_data);
942 }
943 drop(consumer_table);
944 let producer_table = self.producer_table.read().await;
945 for (group_name, _) in producer_table.iter() {
946 let producer_data = ProducerData {
947 group_name: group_name.clone(),
948 };
949 heartbeat_data.producer_data_set.insert(producer_data);
950 }
951 drop(producer_table);
952 heartbeat_data.is_without_sub = is_without_sub;
953 heartbeat_data
954 }
955
956 pub async fn register_consumer(
957 &mut self,
958 group: &CheetahString,
959 consumer: MQConsumerInnerImpl,
960 ) -> bool {
961 let mut consumer_table = self.consumer_table.write().await;
962 if consumer_table.contains_key(group) {
963 warn!("the consumer group[{}] exist already.", group);
964 return false;
965 }
966 consumer_table.insert(group.clone(), consumer);
967 true
968 }
969
970 pub async fn check_client_in_broker(&mut self) -> rocketmq_error::RocketMQResult<()> {
971 let consumer_table = self.consumer_table.read().await;
972 for (key, value) in consumer_table.iter() {
973 let subscription_inner = value.subscriptions();
974 if subscription_inner.is_empty() {
975 return Ok(());
976 }
977 for subscription_data in subscription_inner.iter() {
978 if ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) {
979 continue;
980 }
981 let addr = self
982 .find_broker_addr_by_topic(subscription_data.topic.as_str())
983 .await;
984 if let Some(addr) = addr {
985 match self
986 .mq_client_api_impl
987 .as_mut()
988 .unwrap()
989 .check_client_in_broker(
990 addr.as_str(),
991 key,
992 self.client_id.as_str(),
993 subscription_data,
994 self.client_config.mq_client_api_timeout,
995 )
996 .await
997 {
998 Ok(_) => {}
999 Err(e) => match e {
1000 rocketmq_error::RocketmqError::MQClientErr(err) => {
1001 return Err(rocketmq_error::RocketmqError::MQClientErr(err));
1002 }
1003 _ => {
1004 let desc = format!(
1005 "Check client in broker error, maybe because you use {} to \
1006 filter message, but server has not been upgraded to \
1007 support!This error would not affect the launch of consumer, \
1008 but may has impact on message receiving if you have use the \
1009 new features which are not supported by server, please check \
1010 the log!",
1011 subscription_data.expression_type
1012 );
1013 return mq_client_err!(desc);
1014 }
1015 },
1016 }
1017 }
1018 }
1019 }
1020
1021 Ok(())
1022 }
1023
1024 pub async fn do_rebalance(&mut self) -> bool {
1025 let mut balanced = true;
1026 let consumer_table = self.consumer_table.read().await;
1027 for (key, value) in consumer_table.iter() {
1028 match value.try_rebalance().await {
1029 Ok(result) => {
1030 if !result {
1031 balanced = false;
1032 }
1033 }
1034 Err(e) => {
1035 error!(
1036 "doRebalance for consumer group [{}] exception:{}",
1037 key,
1038 e.to_string()
1039 );
1040 }
1041 }
1042 }
1043 balanced
1044 }
1045
1046 pub fn rebalance_later(&mut self, delay_millis: u64) {
1047 if delay_millis == 0 {
1048 self.rebalance_service.wakeup();
1049 } else {
1050 let service = self.rebalance_service.clone();
1051 self.instance_runtime.get_handle().spawn(async move {
1052 tokio::time::sleep(Duration::from_millis(delay_millis)).await;
1053 service.wakeup();
1054 });
1055 }
1056 }
1057
1058 pub async fn find_broker_address_in_subscribe(
1059 &mut self,
1060 broker_name: &CheetahString,
1061 broker_id: u64,
1062 only_this_broker: bool,
1063 ) -> Option<FindBrokerResult> {
1064 if broker_name.is_empty() {
1065 return None;
1066 }
1067 let broker_addr_table = self.broker_addr_table.read().await;
1068 let map = broker_addr_table.get(broker_name);
1069 let mut broker_addr = None;
1070 let mut slave = false;
1071 let mut found = false;
1072
1073 if let Some(map) = map {
1074 broker_addr = map.get(&broker_id);
1075 slave = broker_id != mix_all::MASTER_ID;
1076 found = broker_addr.is_some();
1077 if !found && slave {
1078 broker_addr = map.get(&(broker_id + 1));
1079 found = broker_addr.is_some();
1080 }
1081 if !found && !only_this_broker {
1082 if let Some((key, value)) = map.iter().next() {
1083 slave = *key != mix_all::MASTER_ID;
1085 found = !value.is_empty();
1086 }
1087 }
1088 }
1089 if found {
1090 let broker_addr = broker_addr.cloned()?;
1091 let broker_version = self
1092 .find_broker_version(broker_name, broker_addr.as_str())
1093 .await;
1094 Some(FindBrokerResult {
1095 broker_addr,
1096 slave,
1097 broker_version,
1098 })
1099 } else {
1100 None
1101 }
1102 }
1103 async fn find_broker_version(&self, broker_name: &str, broker_addr: &str) -> i32 {
1104 let broker_version_table = self.broker_version_table.read().await;
1105 if let Some(map) = broker_version_table.get(broker_name) {
1106 if let Some(version) = map.get(broker_addr) {
1107 return *version;
1108 }
1109 }
1110 0
1111 }
1112
1113 pub async fn select_consumer(&self, group: &str) -> Option<MQConsumerInnerImpl> {
1114 let consumer_table = self.consumer_table.read().await;
1115 consumer_table.get(group).cloned()
1116 }
1117
1118 pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
1119 let producer_table = self.producer_table.read().await;
1120 producer_table.get(group).cloned()
1121 }
1122
1123 pub async fn unregister_consumer(&mut self, group: impl Into<CheetahString>) {
1124 self.unregister_client(None, Some(group.into())).await;
1125 }
1126 pub async fn unregister_producer(&mut self, group: impl Into<CheetahString>) {
1127 self.unregister_client(Some(group.into()), None).await;
1128 }
1129
1130 pub async fn unregister_admin_ext(&mut self, group: impl Into<CheetahString>) {
1131 let mut write_guard = self.admin_ext_table.write().await;
1132 let _ = write_guard.remove(&group.into());
1133 }
1134 async fn unregister_client(
1135 &mut self,
1136 producer_group: Option<CheetahString>,
1137 consumer_group: Option<CheetahString>,
1138 ) {
1139 let broker_addr_table = self.broker_addr_table.read().await;
1140 for (broker_name, broker_addrs) in broker_addr_table.iter() {
1141 for (id, addr) in broker_addrs.iter() {
1142 if let Err(err) = self
1143 .mq_client_api_impl
1144 .as_mut()
1145 .unwrap()
1146 .unregister_client(
1147 addr,
1148 self.client_id.clone(),
1149 producer_group.clone(),
1150 consumer_group.clone(),
1151 self.client_config.mq_client_api_timeout,
1152 )
1153 .await
1154 {
1155 } else {
1156 info!(
1157 "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \
1158 success",
1159 producer_group, consumer_group, broker_name, id, addr,
1160 );
1161 }
1162 }
1163 }
1164 }
1165
1166 async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
1167 let topic_route_table = self.topic_route_table.read().await;
1168 for (_, value) in topic_route_table.iter() {
1169 for bd in value.broker_datas.iter() {
1170 for (_, value) in bd.broker_addrs().iter() {
1171 if value.as_str() == addr {
1172 return true;
1173 }
1174 }
1175 }
1176 }
1177 false
1178 }
1179
1180 pub async fn query_assignment(
1200 &mut self,
1201 topic: &CheetahString,
1202 consumer_group: &CheetahString,
1203 strategy_name: &CheetahString,
1204 message_model: MessageModel,
1205 timeout: u64,
1206 ) -> rocketmq_error::RocketMQResult<Option<HashSet<MessageQueueAssignment>>> {
1207 let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
1209
1210 if broker_addr.is_none() {
1212 self.update_topic_route_info_from_name_server_topic(topic)
1213 .await;
1214 broker_addr = self.find_broker_addr_by_topic(topic).await;
1215 }
1216 if let Some(broker_addr) = broker_addr {
1217 let client_id = self.client_id.clone();
1218 match self.mq_client_api_impl.as_mut() {
1219 Some(api_impl) => {
1220 api_impl
1221 .query_assignment(
1222 &broker_addr,
1223 topic.clone(),
1224 consumer_group.clone(),
1225 client_id,
1226 strategy_name.clone(),
1227 message_model,
1228 timeout,
1229 )
1230 .await
1231 }
1232 None => mq_client_err!("mq_client_api_impl is None"),
1233 }
1234 } else {
1235 Ok(None)
1236 }
1237 }
1238
1239 pub async fn consume_message_directly(
1240 &self,
1241 message: MessageExt,
1242 consumer_group: &CheetahString,
1243 broker_name: Option<CheetahString>,
1244 ) -> Option<ConsumeMessageDirectlyResult> {
1245 let consumer_table = self.consumer_table.read().await;
1246 let consumer_inner = consumer_table.get(consumer_group);
1247 if let Some(consumer) = consumer_inner {
1248 consumer
1249 .consume_message_directly(message, broker_name)
1250 .await;
1251 }
1252
1253 None
1254 }
1255}
1256
1257pub fn topic_route_data2topic_publish_info(
1258 topic: &str,
1259 route: &mut TopicRouteData,
1260) -> TopicPublishInfo {
1261 let mut info = TopicPublishInfo {
1262 topic_route_data: Some(route.clone()),
1263 ..Default::default()
1264 };
1265 if route.order_topic_conf.is_some() && !route.order_topic_conf.as_ref().unwrap().is_empty() {
1266 let brokers = route
1267 .order_topic_conf
1268 .as_ref()
1269 .unwrap()
1270 .split(";")
1271 .map(|s| s.to_string())
1272 .collect::<Vec<String>>();
1273 for broker in brokers {
1274 let item = broker.split(":").collect::<Vec<&str>>();
1275 if item.len() == 2 {
1276 let queue_num = item[1].parse::<i32>().unwrap();
1277 for i in 0..queue_num {
1278 let mq = MessageQueue::from_parts(topic, item[0], i);
1279 info.message_queue_list.push(mq);
1280 }
1281 }
1282 }
1283 info.order_topic = true;
1284 } else if route.order_topic_conf.is_none()
1285 && route.topic_queue_mapping_by_broker.is_some()
1286 && !route
1287 .topic_queue_mapping_by_broker
1288 .as_ref()
1289 .unwrap()
1290 .is_empty()
1291 {
1292 info.order_topic = false;
1293 let mq_end_points =
1294 ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1295 if let Some(mq_end_points) = mq_end_points {
1296 for (mq, broker_name) in mq_end_points {
1297 info.message_queue_list.push(mq);
1298 }
1299 }
1300 info.message_queue_list
1301 .sort_by(|a, b| match a.get_queue_id().cmp(&b.get_queue_id()) {
1302 Ordering::Less => std::cmp::Ordering::Less,
1303 Ordering::Equal => std::cmp::Ordering::Equal,
1304 Ordering::Greater => std::cmp::Ordering::Greater,
1305 });
1306 } else {
1307 route.queue_datas.sort();
1308 for queue_data in route.queue_datas.iter() {
1309 if PermName::is_writeable(queue_data.perm) {
1310 let mut broker_data = None;
1311 for bd in route.broker_datas.iter() {
1312 if bd.broker_name() == queue_data.broker_name.as_str() {
1313 broker_data = Some(bd.clone());
1314 break;
1315 }
1316 }
1317 if broker_data.is_none() {
1318 continue;
1319 }
1320 if !broker_data
1321 .as_ref()
1322 .unwrap()
1323 .broker_addrs()
1324 .contains_key(&(mix_all::MASTER_ID))
1325 {
1326 continue;
1327 }
1328 for i in 0..queue_data.write_queue_nums {
1329 let mq =
1330 MessageQueue::from_parts(topic, queue_data.broker_name.as_str(), i as i32);
1331 info.message_queue_list.push(mq);
1332 }
1333 }
1334 }
1335 }
1336 info
1337}
1338
1339pub fn topic_route_data2topic_subscribe_info(
1340 topic: &str,
1341 route: &TopicRouteData,
1342) -> HashSet<MessageQueue> {
1343 if let Some(ref topic_queue_mapping_by_broker) = route.topic_queue_mapping_by_broker {
1344 if !topic_queue_mapping_by_broker.is_empty() {
1345 let mq_endpoints =
1346 ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1347 return mq_endpoints
1348 .unwrap_or_default()
1349 .keys()
1350 .cloned()
1351 .collect::<HashSet<MessageQueue>>();
1352 }
1353 }
1354 let mut mq_list = HashSet::new();
1355 for qd in &route.queue_datas {
1356 if PermName::is_readable(qd.perm) {
1357 for i in 0..qd.read_queue_nums {
1358 let mq = MessageQueue::from_parts(topic, qd.broker_name.as_str(), i as i32);
1359 mq_list.insert(mq);
1360 }
1361 }
1362 }
1363 mq_list
1364}