rocketmq_client_rust/factory/
mq_client_instance.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::sync::atomic::AtomicI64;
21use std::sync::Arc;
22use std::thread;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26use rand::seq::IndexedRandom;
27use rocketmq_common::common::base::service_state::ServiceState;
28use rocketmq_common::common::constant::PermName;
29use rocketmq_common::common::filter::expression_type::ExpressionType;
30use rocketmq_common::common::message::message_ext::MessageExt;
31use rocketmq_common::common::message::message_queue::MessageQueue;
32use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
33use rocketmq_common::common::mix_all;
34use rocketmq_common::TimeUtils::get_current_millis;
35use rocketmq_error::mq_client_err;
36use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
37use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
38use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
39use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
40use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
41use rocketmq_remoting::protocol::heartbeat::producer_data::ProducerData;
42use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
43use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
44use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
45use rocketmq_remoting::runtime::RPCHook;
46use rocketmq_runtime::RocketMQRuntime;
47use rocketmq_rust::ArcMut;
48use rocketmq_rust::RocketMQTokioMutex;
49use tokio::runtime::Handle;
50use tokio::sync::RwLock;
51use tracing::error;
52use tracing::info;
53use tracing::warn;
54
55use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
56use crate::base::client_config::ClientConfig;
57use crate::consumer::consumer_impl::pull_message_service::PullMessageService;
58use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceService;
59use crate::consumer::mq_consumer_inner::MQConsumerInner;
60use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
61use crate::implementation::client_remoting_processor::ClientRemotingProcessor;
62use crate::implementation::find_broker_result::FindBrokerResult;
63use crate::implementation::mq_admin_impl::MQAdminImpl;
64use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
65use crate::producer::default_mq_producer::DefaultMQProducer;
66use crate::producer::default_mq_producer::ProducerConfig;
67use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
68use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
69
70const LOCK_TIMEOUT_MILLIS: u64 = 3000;
71
72pub struct MQClientInstance {
73    pub(crate) client_config: ArcMut<ClientConfig>,
74    pub(crate) client_id: CheetahString,
75    boot_timestamp: u64,
76    /**
77     * The container of the producer in the current client. The key is the name of
78     * producerGroup.
79     */
80    producer_table: Arc<RwLock<HashMap<CheetahString, MQProducerInnerImpl>>>,
81    /**
82     * The container of the consumer in the current client. The key is the name of
83     * consumer_group.
84     */
85    consumer_table: Arc<RwLock<HashMap<CheetahString, MQConsumerInnerImpl>>>,
86    /**
87     * The container of the adminExt in the current client. The key is the name of
88     * adminExtGroup.
89     */
90    admin_ext_table: Arc<RwLock<HashMap<CheetahString, MQAdminExtInnerImpl>>>,
91    pub(crate) mq_client_api_impl: Option<ArcMut<MQClientAPIImpl>>,
92    pub(crate) mq_admin_impl: ArcMut<MQAdminImpl>,
93    pub(crate) topic_route_table: Arc<RwLock<HashMap<CheetahString /* Topic */, TopicRouteData>>>,
94    topic_end_points_table: Arc<
95        RwLock<
96            HashMap<
97                CheetahString, /* Topic */
98                HashMap<MessageQueue, CheetahString /* brokerName */>,
99            >,
100        >,
101    >,
102    lock_namesrv: Arc<RocketMQTokioMutex<()>>,
103    lock_heartbeat: Arc<RocketMQTokioMutex<()>>,
104
105    service_state: ServiceState,
106    pub(crate) pull_message_service: ArcMut<PullMessageService>,
107    rebalance_service: RebalanceService,
108    pub(crate) default_producer: ArcMut<DefaultMQProducer>,
109    instance_runtime: Arc<RocketMQRuntime>,
110    broker_addr_table: Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>>,
111    broker_version_table: Arc<
112        RwLock<
113            HashMap<
114                CheetahString, /* Broker Name */
115                HashMap<CheetahString /* address */, i32>,
116            >,
117        >,
118    >,
119    send_heartbeat_times_total: Arc<AtomicI64>,
120}
121
122impl MQClientInstance {
123    pub fn new(
124        client_config: ClientConfig,
125        instance_index: i32,
126        client_id: String,
127        rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
128    ) -> Self {
129        /* let broker_addr_table = Arc::new(Default::default());
130        let (tx, _) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
131        let rx = tx.subscribe();
132        let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
133            Arc::new(TokioClientConfig::default()),
134            ClientRemotingProcessor::new(),
135            rpc_hook,
136            client_config.clone(),
137            Some(tx),
138        ));
139        if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
140            let handle = Handle::current();
141            let mq_client_api_impl_cloned = mq_client_api_impl.clone();
142            let namesrv_addr = namesrv_addr.to_string();
143            thread::spawn(move || {
144                handle.block_on(async move {
145                    mq_client_api_impl_cloned
146                        .update_name_server_address_list(namesrv_addr.as_str())
147                        .await;
148                })
149            });
150        }
151        let instance = MQClientInstance {
152            client_config: Arc::new(client_config.clone()),
153            client_id,
154            boot_timestamp: get_current_millis(),
155            producer_table: Arc::new(RwLock::new(HashMap::new())),
156            consumer_table: Arc::new(Default::default()),
157            admin_ext_table: Arc::new(Default::default()),
158            mq_client_api_impl:None,
159            mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
160            topic_route_table: Arc::new(Default::default()),
161            topic_end_points_table: Arc::new(Default::default()),
162            lock_namesrv: Default::default(),
163            lock_heartbeat: Default::default(),
164            service_state: ServiceState::CreateJust,
165            pull_message_service: ArcMut::new(PullMessageService::new()),
166            rebalance_service: RebalanceService::new(),
167            default_producer: ArcMut::new(
168                DefaultMQProducer::builder()
169                    .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
170                    .client_config(client_config.clone())
171                    .build(),
172            ),
173            instance_runtime: Arc::new(RocketMQRuntime::new_multi(
174                num_cpus::get(),
175                "mq-client-instance",
176            )),
177            broker_addr_table,
178            broker_version_table: Arc::new(Default::default()),
179            send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
180            tx: Some(rx),
181        };
182        // let instance_ = instance.clone();
183
184        instance*/
185        unimplemented!()
186    }
187
188    pub fn new_arc(
189        client_config: ClientConfig,
190        instance_index: i32,
191        client_id: impl Into<CheetahString>,
192        rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
193    ) -> ArcMut<MQClientInstance> {
194        let broker_addr_table = Arc::new(Default::default());
195        let mut instance = ArcMut::new(MQClientInstance {
196            client_config: ArcMut::new(client_config.clone()),
197            client_id: client_id.into(),
198            boot_timestamp: get_current_millis(),
199            producer_table: Arc::new(RwLock::new(HashMap::new())),
200            consumer_table: Arc::new(Default::default()),
201            admin_ext_table: Arc::new(Default::default()),
202            mq_client_api_impl: None,
203            mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
204            topic_route_table: Arc::new(Default::default()),
205            topic_end_points_table: Arc::new(Default::default()),
206            lock_namesrv: Default::default(),
207            lock_heartbeat: Default::default(),
208            service_state: ServiceState::CreateJust,
209            pull_message_service: ArcMut::new(PullMessageService::new()),
210            rebalance_service: RebalanceService::new(),
211            default_producer: ArcMut::new(
212                DefaultMQProducer::builder()
213                    .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
214                    .client_config(client_config.clone())
215                    .build(),
216            ),
217            instance_runtime: Arc::new(RocketMQRuntime::new_multi(
218                num_cpus::get(),
219                "mq-client-instance",
220            )),
221            broker_addr_table,
222            broker_version_table: Arc::new(Default::default()),
223            send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
224        });
225        let instance_clone = instance.clone();
226        instance.mq_admin_impl.set_client(instance_clone);
227        let weak_instance = ArcMut::downgrade(&instance);
228        let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
229
230        let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
231            Arc::new(TokioClientConfig::default()),
232            ClientRemotingProcessor::new(instance.clone()),
233            rpc_hook,
234            client_config.clone(),
235            Some(tx),
236        ));
237        instance.mq_client_api_impl = Some(mq_client_api_impl.clone());
238        if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
239            let handle = Handle::current();
240
241            let namesrv_addr = namesrv_addr.to_string();
242            thread::spawn(move || {
243                handle.block_on(async move {
244                    mq_client_api_impl
245                        .update_name_server_address_list(namesrv_addr.as_str())
246                        .await;
247                })
248            });
249        }
250        tokio::spawn(async move {
251            while let Ok(value) = rx.recv().await {
252                if let Some(instance_) = weak_instance.upgrade() {
253                    match value {
254                        ConnectionNetEvent::CONNECTED(remote_address) => {
255                            info!("ConnectionNetEvent CONNECTED");
256                            let broker_addr_table = instance_.broker_addr_table.read().await;
257                            for (broker_name, broker_addrs) in broker_addr_table.iter() {
258                                for (id, addr) in broker_addrs.iter() {
259                                    if addr == remote_address.to_string().as_str()
260                                        && instance_
261                                            .send_heartbeat_to_broker(*id, broker_name, addr)
262                                            .await
263                                    {
264                                        instance_.re_balance_immediately();
265                                    }
266                                }
267                            }
268                        }
269                        ConnectionNetEvent::DISCONNECTED => {}
270                        ConnectionNetEvent::EXCEPTION => {}
271                    }
272                }
273            }
274            warn!("ConnectionNetEvent recv error");
275        });
276        instance
277    }
278
279    pub fn re_balance_immediately(&self) {
280        self.rebalance_service.wakeup();
281    }
282
283    pub fn re_balance_later(&self, delay_millis: Duration) {
284        if delay_millis <= Duration::from_millis(0) {
285            self.rebalance_service.wakeup();
286        } else {
287            let service = self.rebalance_service.clone();
288            tokio::spawn(async move {
289                tokio::time::sleep(delay_millis).await;
290                service.wakeup();
291            });
292        }
293    }
294
295    pub async fn start(&mut self, this: ArcMut<Self>) -> rocketmq_error::RocketMQResult<()> {
296        match self.service_state {
297            ServiceState::CreateJust => {
298                self.service_state = ServiceState::StartFailed;
299                // If not specified,looking address from name remoting_server
300                if self.client_config.namesrv_addr.is_none() {
301                    self.mq_client_api_impl
302                        .as_mut()
303                        .expect("mq_client_api_impl is None")
304                        .fetch_name_server_addr()
305                        .await;
306                }
307                // Start request-response channel
308                self.mq_client_api_impl
309                    .as_mut()
310                    .expect("mq_client_api_impl is None")
311                    .start()
312                    .await;
313                // Start various schedule tasks
314                self.start_scheduled_task(this.clone());
315                // Start pull service
316                let instance = this.clone();
317                self.pull_message_service.start(instance).await;
318                // Start rebalance service
319                self.rebalance_service.start(this).await;
320                // Start push service
321
322                self.default_producer
323                    .default_mqproducer_impl
324                    .as_mut()
325                    .unwrap()
326                    .start_with_factory(false)
327                    .await?;
328                info!("the client factory[{}] start OK", self.client_id);
329                self.service_state = ServiceState::Running;
330            }
331            ServiceState::Running => {}
332            ServiceState::ShutdownAlready => {}
333            ServiceState::StartFailed => {
334                return mq_client_err!(format!(
335                    "The Factory object[{}] has been created before, and failed.",
336                    self.client_id
337                ));
338            }
339        }
340        Ok(())
341    }
342
343    pub async fn shutdown(&mut self) {}
344
345    pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
346        if group.is_empty() {
347            return false;
348        }
349        let mut producer_table = self.producer_table.write().await;
350        if producer_table.contains_key(group) {
351            warn!("the producer group[{}] exist already.", group);
352            return false;
353        }
354        producer_table.insert(group.into(), producer);
355        true
356    }
357
358    pub async fn register_admin_ext(&mut self, group: &str, admin: MQAdminExtInnerImpl) -> bool {
359        if group.is_empty() {
360            return false;
361        }
362        let mut admin_ext_table = self.admin_ext_table.write().await;
363        if admin_ext_table.contains_key(group) {
364            warn!("the admin group[{}] exist already.", group);
365            return false;
366        }
367        admin_ext_table.insert(group.into(), admin);
368        true
369    }
370
371    fn start_scheduled_task(&mut self, this: ArcMut<Self>) {
372        if self.client_config.namesrv_addr.is_none() {
373            // Fetch name server address
374            let mut mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
375            self.instance_runtime.get_handle().spawn(async move {
376                info!("ScheduledTask fetchNameServerAddr started");
377                tokio::time::sleep(Duration::from_secs(10)).await;
378                loop {
379                    let current_execution_time = tokio::time::Instant::now();
380                    mq_client_api_impl.fetch_name_server_addr().await;
381                    let next_execution_time = current_execution_time + Duration::from_secs(120);
382                    let delay =
383                        next_execution_time.saturating_duration_since(tokio::time::Instant::now());
384                    tokio::time::sleep(delay).await;
385                }
386            });
387        }
388
389        // Update topic route info from name server
390        let mut client_instance = this.clone();
391        let poll_name_server_interval = self.client_config.poll_name_server_interval;
392        self.instance_runtime.get_handle().spawn(async move {
393            info!("ScheduledTask update_topic_route_info_from_name_server started");
394            tokio::time::sleep(Duration::from_millis(10)).await;
395            loop {
396                let current_execution_time = tokio::time::Instant::now();
397                client_instance
398                    .update_topic_route_info_from_name_server()
399                    .await;
400                let next_execution_time = current_execution_time
401                    + Duration::from_millis(poll_name_server_interval as u64);
402                let delay =
403                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
404                tokio::time::sleep(delay).await;
405            }
406        });
407
408        // Clean offline broker and send heartbeat to all broker
409        let mut client_instance = this.clone();
410        let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
411        self.instance_runtime.get_handle().spawn(async move {
412            info!("ScheduledTask clean_offline_broker started");
413            tokio::time::sleep(Duration::from_secs(1)).await;
414            loop {
415                let current_execution_time = tokio::time::Instant::now();
416                client_instance.clean_offline_broker().await;
417                client_instance
418                    .send_heartbeat_to_all_broker_with_lock()
419                    .await;
420                let next_execution_time = current_execution_time
421                    + Duration::from_millis(heartbeat_broker_interval as u64);
422                let delay =
423                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
424                tokio::time::sleep(delay).await;
425            }
426        });
427
428        // Persist all consumer offset
429        let mut client_instance = this;
430        let persist_consumer_offset_interval =
431            self.client_config.persist_consumer_offset_interval as u64;
432        self.instance_runtime.get_handle().spawn(async move {
433            info!("ScheduledTask persistAllConsumerOffset started");
434            tokio::time::sleep(Duration::from_secs(10)).await;
435            loop {
436                let current_execution_time = tokio::time::Instant::now();
437                client_instance.persist_all_consumer_offset().await;
438                let next_execution_time = current_execution_time
439                    + Duration::from_millis(persist_consumer_offset_interval);
440                let delay =
441                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
442                tokio::time::sleep(delay).await;
443            }
444        });
445    }
446
447    pub async fn update_topic_route_info_from_name_server(&mut self) {
448        let mut topic_list = HashSet::new();
449
450        {
451            let producer_table = self.producer_table.read().await;
452            for (_, value) in producer_table.iter() {
453                topic_list.extend(value.get_publish_topic_list());
454            }
455        }
456
457        {
458            let consumer_table = self.consumer_table.read().await;
459            for (_, value) in consumer_table.iter() {
460                value.subscriptions().iter().for_each(|sub| {
461                    topic_list.insert(sub.topic.clone());
462                });
463            }
464        }
465
466        for topic in topic_list.iter() {
467            self.update_topic_route_info_from_name_server_topic(topic)
468                .await;
469        }
470    }
471
472    #[inline]
473    pub async fn update_topic_route_info_from_name_server_topic(
474        &mut self,
475        topic: &CheetahString,
476    ) -> bool {
477        self.update_topic_route_info_from_name_server_default(topic, false, None)
478            .await
479    }
480
481    pub async fn find_consumer_id_list(
482        &mut self,
483        topic: &CheetahString,
484        group: &CheetahString,
485    ) -> Option<Vec<CheetahString>> {
486        let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
487        if broker_addr.is_none() {
488            self.update_topic_route_info_from_name_server_topic(topic)
489                .await;
490            broker_addr = self.find_broker_addr_by_topic(topic).await;
491        }
492        if let Some(broker_addr) = broker_addr {
493            match self
494                .mq_client_api_impl
495                .as_mut()
496                .unwrap()
497                .get_consumer_id_list_by_group(
498                    broker_addr.as_str(),
499                    group,
500                    self.client_config.mq_client_api_timeout,
501                )
502                .await
503            {
504                Ok(value) => return Some(value),
505                Err(e) => {
506                    warn!(
507                        "getConsumerIdListByGroup exception,{}  {}, err:{}",
508                        broker_addr,
509                        group,
510                        e.to_string()
511                    );
512                }
513            }
514        }
515        None
516    }
517
518    pub async fn find_broker_addr_by_topic(&self, topic: &str) -> Option<CheetahString> {
519        let topic_route_table = self.topic_route_table.read().await;
520        if let Some(topic_route_data) = topic_route_table.get(topic) {
521            let brokers = &topic_route_data.broker_datas;
522            if !brokers.is_empty() {
523                let bd = brokers.choose(&mut rand::rng());
524                if let Some(bd) = bd {
525                    return bd.select_broker_addr();
526                }
527            }
528        }
529        None
530    }
531
532    pub async fn update_topic_route_info_from_name_server_default(
533        &mut self,
534        topic: &CheetahString,
535        is_default: bool,
536        producer_config: Option<&Arc<ProducerConfig>>,
537    ) -> bool {
538        let lock = self.lock_namesrv.lock().await;
539        let topic_route_data = if let (true, Some(producer_config)) = (is_default, producer_config)
540        {
541            let mut result = match self
542                .mq_client_api_impl
543                .as_mut()
544                .unwrap()
545                .get_default_topic_route_info_from_name_server(
546                    self.client_config.mq_client_api_timeout,
547                )
548                .await
549            {
550                Ok(value) => value,
551                Err(e) => {
552                    error!(
553                        "get_default_topic_route_info_from_name_server failed, topic: {}, error: \
554                         {}",
555                        topic, e
556                    );
557                    None
558                }
559            };
560            if let Some(topic_route_data) = result.as_mut() {
561                for data in topic_route_data.queue_datas.iter_mut() {
562                    let queue_nums = producer_config
563                        .default_topic_queue_nums()
564                        .max(data.read_queue_nums);
565                    data.read_queue_nums = queue_nums;
566                    data.write_queue_nums = queue_nums;
567                }
568            }
569            result
570        } else {
571            self.mq_client_api_impl
572                .as_mut()
573                .unwrap()
574                .get_topic_route_info_from_name_server(
575                    topic,
576                    self.client_config.mq_client_api_timeout,
577                )
578                .await
579                .unwrap_or(None)
580        };
581        if let Some(mut topic_route_data) = topic_route_data {
582            let mut topic_route_table = self.topic_route_table.write().await;
583            let old = topic_route_table.get(topic);
584            let mut changed = topic_route_data.topic_route_data_changed(old);
585            if !changed {
586                changed = self.is_need_update_topic_route_info(topic).await;
587            } else {
588                info!(
589                    "the topic[{}] route info changed, old[{:?}] ,new[{:?}]",
590                    topic, old, topic_route_data
591                )
592            }
593            if changed {
594                let mut broker_addr_table = self.broker_addr_table.write().await;
595                for bd in topic_route_data.broker_datas.iter() {
596                    broker_addr_table.insert(bd.broker_name().clone(), bd.broker_addrs().clone());
597                }
598                drop(broker_addr_table);
599
600                // Update endpoint map
601                {
602                    let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(
603                        topic,
604                        &topic_route_data,
605                    );
606                    if let Some(mq_end_points) = mq_end_points {
607                        if !mq_end_points.is_empty() {
608                            let mut topic_end_points_table =
609                                self.topic_end_points_table.write().await;
610                            topic_end_points_table.insert(topic.into(), mq_end_points);
611                        }
612                    }
613                }
614
615                // Update Pub info
616                {
617                    let mut publish_info =
618                        topic_route_data2topic_publish_info(topic, &mut topic_route_data);
619                    publish_info.have_topic_router_info = true;
620                    let mut producer_table = self.producer_table.write().await;
621                    for (_, value) in producer_table.iter_mut() {
622                        value.update_topic_publish_info(
623                            topic.to_string(),
624                            Some(publish_info.clone()),
625                        );
626                    }
627                }
628
629                // Update sub info
630                {
631                    let consumer_table = self.consumer_table.read().await;
632                    if !consumer_table.is_empty() {
633                        let subscribe_info =
634                            topic_route_data2topic_subscribe_info(topic, &topic_route_data);
635                        for (_, value) in consumer_table.iter() {
636                            value
637                                .update_topic_subscribe_info(topic.clone(), &subscribe_info)
638                                .await;
639                        }
640                    }
641                }
642                let clone_topic_route_data = TopicRouteData::from_existing(&topic_route_data);
643                topic_route_table.insert(topic.clone(), clone_topic_route_data);
644                return true;
645            }
646        } else {
647            warn!(
648                "updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, \
649                 Topic: {}. [{}]",
650                topic, self.client_id
651            );
652        }
653
654        drop(lock);
655        false
656    }
657
658    async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool {
659        let mut result = false;
660        let producer_table = self.producer_table.read().await;
661        for (key, value) in producer_table.iter() {
662            if !result {
663                result = value.is_publish_topic_need_update(topic);
664                break;
665            }
666        }
667        if result {
668            return true;
669        }
670
671        let consumer_table = self.consumer_table.read().await;
672        for (key, value) in consumer_table.iter() {
673            if !result {
674                result = value.is_subscribe_topic_need_update(topic).await;
675                break;
676            }
677        }
678        result
679    }
680
681    pub async fn persist_all_consumer_offset(&mut self) {
682        let consumer_table = self.consumer_table.read().await;
683        for (_, value) in consumer_table.iter() {
684            value.persist_consumer_offset().await;
685        }
686    }
687
688    pub async fn clean_offline_broker(&mut self) {
689        let lock = self
690            .lock_namesrv
691            .try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
692            .await;
693        if let Some(lock) = lock {
694            let mut broker_addr_table = self.broker_addr_table.write().await;
695            let mut updated_table = HashMap::with_capacity(broker_addr_table.len());
696            let mut broker_name_set = HashSet::new();
697            for (broker_name, one_table) in broker_addr_table.iter() {
698                let mut clone_addr_table = one_table.clone();
699                let mut remove_id_set = HashSet::new();
700                for (id, addr) in one_table.iter() {
701                    if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
702                        remove_id_set.insert(*id);
703                    }
704                }
705                clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
706                if clone_addr_table.is_empty() {
707                    info!(
708                        "the broker[{}] name's host is offline, remove it",
709                        broker_name
710                    );
711                    broker_name_set.insert(broker_name.clone());
712                } else {
713                    updated_table.insert(broker_name.clone(), clone_addr_table);
714                }
715            }
716            broker_addr_table.retain(|k, _| !broker_name_set.contains(k));
717            if !updated_table.is_empty() {
718                broker_addr_table.extend(updated_table);
719            }
720        }
721    }
722    pub async fn send_heartbeat_to_all_broker_with_lock(&mut self) -> bool {
723        if let Some(lock) = self.lock_heartbeat.try_lock().await {
724            if self.client_config.use_heartbeat_v2 {
725                self.send_heartbeat_to_all_broker_v2(false).await
726            } else {
727                self.send_heartbeat_to_all_broker().await
728            }
729        } else {
730            warn!("lock heartBeat, but failed. [{}]", self.client_id);
731            false
732        }
733    }
734
735    pub async fn send_heartbeat_to_all_broker_with_lock_v2(&mut self, is_rebalance: bool) -> bool {
736        if let Some(lock) = self
737            .lock_heartbeat
738            .try_lock_timeout(Duration::from_secs(2))
739            .await
740        {
741            if self.client_config.use_heartbeat_v2 {
742                self.send_heartbeat_to_all_broker_v2(is_rebalance).await
743            } else {
744                self.send_heartbeat_to_all_broker().await
745            }
746        } else {
747            warn!("lock heartBeat, but failed. [{}]", self.client_id);
748            false
749        }
750    }
751
752    pub fn get_mq_client_api_impl(&self) -> ArcMut<MQClientAPIImpl> {
753        self.mq_client_api_impl.as_ref().unwrap().clone()
754    }
755
756    pub async fn get_broker_name_from_message_queue(
757        &self,
758        message_queue: &MessageQueue,
759    ) -> CheetahString {
760        let guard = self.topic_end_points_table.read().await;
761        if let Some(broker_name) = guard.get(message_queue.get_topic()) {
762            if let Some(addr) = broker_name.get(message_queue) {
763                return addr.clone();
764            }
765        }
766        message_queue.get_broker_name().clone()
767    }
768
769    pub async fn find_broker_address_in_publish(
770        &self,
771        broker_name: &CheetahString,
772    ) -> Option<CheetahString> {
773        if broker_name.is_empty() {
774            return None;
775        }
776        let guard = self.broker_addr_table.read().await;
777        let map = guard.get(broker_name);
778        if let Some(map) = map {
779            return map.get(&(mix_all::MASTER_ID)).cloned();
780        }
781        None
782    }
783
784    async fn send_heartbeat_to_all_broker_v2(&self, is_rebalance: bool) -> bool {
785        unimplemented!()
786    }
787
788    async fn send_heartbeat_to_all_broker(&self) -> bool {
789        let heartbeat_data = self.prepare_heartbeat_data(false).await;
790        let producer_empty = heartbeat_data.producer_data_set.is_empty();
791        let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
792        if producer_empty && consumer_empty {
793            warn!(
794                "sending heartbeat, but no consumer and no producer. [{}]",
795                self.client_id
796            );
797            return false;
798        }
799        let broker_addr_table = self.broker_addr_table.read().await;
800        if broker_addr_table.is_empty() {
801            return false;
802        }
803        for (broker_name, broker_addrs) in broker_addr_table.iter() {
804            if broker_addrs.is_empty() {
805                continue;
806            }
807            for (id, addr) in broker_addrs.iter() {
808                if addr.is_empty() {
809                    continue;
810                }
811                if consumer_empty && *id != mix_all::MASTER_ID {
812                    continue;
813                }
814                self.send_heartbeat_to_broker_inner(*id, broker_name, addr, &heartbeat_data)
815                    .await;
816            }
817        }
818
819        true
820    }
821
822    pub async fn send_heartbeat_to_broker(
823        &self,
824        id: u64,
825        broker_name: &CheetahString,
826        addr: &CheetahString,
827    ) -> bool {
828        if let Some(lock) = self.lock_heartbeat.try_lock().await {
829            let heartbeat_data = self.prepare_heartbeat_data(false).await;
830            let producer_empty = heartbeat_data.producer_data_set.is_empty();
831            let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
832            if producer_empty && consumer_empty {
833                warn!(
834                    "sending heartbeat, but no consumer and no producer. [{}]",
835                    self.client_id
836                );
837                return false;
838            }
839
840            if self.client_config.use_heartbeat_v2 {
841                unimplemented!("sendHeartbeatToBrokerV2")
842            } else {
843                self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
844                    .await
845            }
846        } else {
847            false
848        }
849    }
850
851    async fn send_heartbeat_to_broker_inner(
852        &self,
853        id: u64,
854        broker_name: &CheetahString,
855        addr: &CheetahString,
856        heartbeat_data: &HeartbeatData,
857    ) -> bool {
858        if let Ok(version) = self
859            .mq_client_api_impl
860            .as_ref()
861            .unwrap()
862            .mut_from_ref()
863            .send_heartbeat(
864                addr,
865                heartbeat_data,
866                self.client_config.mq_client_api_timeout,
867            )
868            .await
869        {
870            let mut broker_version_table = self.broker_version_table.write().await;
871            let map = broker_version_table.get_mut(broker_name);
872            if let Some(map) = map {
873                map.insert(addr.clone(), version);
874            } else {
875                let mut map = HashMap::new();
876                map.insert(addr.clone(), version);
877                broker_version_table.insert(broker_name.clone(), map);
878            }
879
880            let times = self
881                .send_heartbeat_times_total
882                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
883            if times % 20 == 0 {
884                info!(
885                    "send heart beat to broker[{} {} {}] success",
886                    broker_name, id, addr,
887                );
888            }
889            return true;
890        }
891        if self.is_broker_in_name_server(addr).await {
892            warn!(
893                "send heart beat to broker[{} {} {}] failed",
894                broker_name, id, addr
895            );
896        } else {
897            warn!(
898                "send heart beat to broker[{} {} {}] exception, because the broker not up, forget \
899                 it",
900                broker_name, id, addr
901            )
902        }
903        false
904    }
905
906    async fn is_broker_in_name_server(&self, broker_name: &str) -> bool {
907        let broker_addr_table = self.topic_route_table.read().await;
908        for (_, value) in broker_addr_table.iter() {
909            for bd in value.broker_datas.iter() {
910                for (_, value) in bd.broker_addrs().iter() {
911                    if value.as_str() == broker_name {
912                        return true;
913                    }
914                }
915            }
916        }
917        false
918    }
919
920    async fn prepare_heartbeat_data(&self, is_without_sub: bool) -> HeartbeatData {
921        let mut heartbeat_data = HeartbeatData {
922            client_id: self.client_id.clone(),
923            ..Default::default()
924        };
925
926        let consumer_table = self.consumer_table.read().await;
927        for (_, value) in consumer_table.iter() {
928            let mut consumer_data = ConsumerData {
929                group_name: value.group_name(),
930                consume_type: value.consume_type(),
931                message_model: value.message_model(),
932                consume_from_where: value.consume_from_where(),
933                subscription_data_set: value.subscriptions(),
934                unit_mode: value.is_unit_mode(),
935            };
936            if !is_without_sub {
937                value.subscriptions().iter().for_each(|sub| {
938                    consumer_data.subscription_data_set.insert(sub.clone());
939                });
940            }
941            heartbeat_data.consumer_data_set.insert(consumer_data);
942        }
943        drop(consumer_table);
944        let producer_table = self.producer_table.read().await;
945        for (group_name, _) in producer_table.iter() {
946            let producer_data = ProducerData {
947                group_name: group_name.clone(),
948            };
949            heartbeat_data.producer_data_set.insert(producer_data);
950        }
951        drop(producer_table);
952        heartbeat_data.is_without_sub = is_without_sub;
953        heartbeat_data
954    }
955
956    pub async fn register_consumer(
957        &mut self,
958        group: &CheetahString,
959        consumer: MQConsumerInnerImpl,
960    ) -> bool {
961        let mut consumer_table = self.consumer_table.write().await;
962        if consumer_table.contains_key(group) {
963            warn!("the consumer group[{}] exist already.", group);
964            return false;
965        }
966        consumer_table.insert(group.clone(), consumer);
967        true
968    }
969
970    pub async fn check_client_in_broker(&mut self) -> rocketmq_error::RocketMQResult<()> {
971        let consumer_table = self.consumer_table.read().await;
972        for (key, value) in consumer_table.iter() {
973            let subscription_inner = value.subscriptions();
974            if subscription_inner.is_empty() {
975                return Ok(());
976            }
977            for subscription_data in subscription_inner.iter() {
978                if ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) {
979                    continue;
980                }
981                let addr = self
982                    .find_broker_addr_by_topic(subscription_data.topic.as_str())
983                    .await;
984                if let Some(addr) = addr {
985                    match self
986                        .mq_client_api_impl
987                        .as_mut()
988                        .unwrap()
989                        .check_client_in_broker(
990                            addr.as_str(),
991                            key,
992                            self.client_id.as_str(),
993                            subscription_data,
994                            self.client_config.mq_client_api_timeout,
995                        )
996                        .await
997                    {
998                        Ok(_) => {}
999                        Err(e) => match e {
1000                            rocketmq_error::RocketmqError::MQClientErr(err) => {
1001                                return Err(rocketmq_error::RocketmqError::MQClientErr(err));
1002                            }
1003                            _ => {
1004                                let desc = format!(
1005                                    "Check client in broker error, maybe because you use {} to \
1006                                     filter message, but server has not been upgraded to \
1007                                     support!This error would not affect the launch of consumer, \
1008                                     but may has impact on message receiving if you have use the \
1009                                     new features which are not supported by server, please check \
1010                                     the log!",
1011                                    subscription_data.expression_type
1012                                );
1013                                return mq_client_err!(desc);
1014                            }
1015                        },
1016                    }
1017                }
1018            }
1019        }
1020
1021        Ok(())
1022    }
1023
1024    pub async fn do_rebalance(&mut self) -> bool {
1025        let mut balanced = true;
1026        let consumer_table = self.consumer_table.read().await;
1027        for (key, value) in consumer_table.iter() {
1028            match value.try_rebalance().await {
1029                Ok(result) => {
1030                    if !result {
1031                        balanced = false;
1032                    }
1033                }
1034                Err(e) => {
1035                    error!(
1036                        "doRebalance for consumer group [{}] exception:{}",
1037                        key,
1038                        e.to_string()
1039                    );
1040                }
1041            }
1042        }
1043        balanced
1044    }
1045
1046    pub fn rebalance_later(&mut self, delay_millis: u64) {
1047        if delay_millis == 0 {
1048            self.rebalance_service.wakeup();
1049        } else {
1050            let service = self.rebalance_service.clone();
1051            self.instance_runtime.get_handle().spawn(async move {
1052                tokio::time::sleep(Duration::from_millis(delay_millis)).await;
1053                service.wakeup();
1054            });
1055        }
1056    }
1057
1058    pub async fn find_broker_address_in_subscribe(
1059        &mut self,
1060        broker_name: &CheetahString,
1061        broker_id: u64,
1062        only_this_broker: bool,
1063    ) -> Option<FindBrokerResult> {
1064        if broker_name.is_empty() {
1065            return None;
1066        }
1067        let broker_addr_table = self.broker_addr_table.read().await;
1068        let map = broker_addr_table.get(broker_name);
1069        let mut broker_addr = None;
1070        let mut slave = false;
1071        let mut found = false;
1072
1073        if let Some(map) = map {
1074            broker_addr = map.get(&broker_id);
1075            slave = broker_id != mix_all::MASTER_ID;
1076            found = broker_addr.is_some();
1077            if !found && slave {
1078                broker_addr = map.get(&(broker_id + 1));
1079                found = broker_addr.is_some();
1080            }
1081            if !found && !only_this_broker {
1082                if let Some((key, value)) = map.iter().next() {
1083                    //broker_addr = Some(value.clone());
1084                    slave = *key != mix_all::MASTER_ID;
1085                    found = !value.is_empty();
1086                }
1087            }
1088        }
1089        if found {
1090            let broker_addr = broker_addr.cloned()?;
1091            let broker_version = self
1092                .find_broker_version(broker_name, broker_addr.as_str())
1093                .await;
1094            Some(FindBrokerResult {
1095                broker_addr,
1096                slave,
1097                broker_version,
1098            })
1099        } else {
1100            None
1101        }
1102    }
1103    async fn find_broker_version(&self, broker_name: &str, broker_addr: &str) -> i32 {
1104        let broker_version_table = self.broker_version_table.read().await;
1105        if let Some(map) = broker_version_table.get(broker_name) {
1106            if let Some(version) = map.get(broker_addr) {
1107                return *version;
1108            }
1109        }
1110        0
1111    }
1112
1113    pub async fn select_consumer(&self, group: &str) -> Option<MQConsumerInnerImpl> {
1114        let consumer_table = self.consumer_table.read().await;
1115        consumer_table.get(group).cloned()
1116    }
1117
1118    pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
1119        let producer_table = self.producer_table.read().await;
1120        producer_table.get(group).cloned()
1121    }
1122
1123    pub async fn unregister_consumer(&mut self, group: impl Into<CheetahString>) {
1124        self.unregister_client(None, Some(group.into())).await;
1125    }
1126    pub async fn unregister_producer(&mut self, group: impl Into<CheetahString>) {
1127        self.unregister_client(Some(group.into()), None).await;
1128    }
1129
1130    pub async fn unregister_admin_ext(&mut self, group: impl Into<CheetahString>) {
1131        let mut write_guard = self.admin_ext_table.write().await;
1132        let _ = write_guard.remove(&group.into());
1133    }
1134    async fn unregister_client(
1135        &mut self,
1136        producer_group: Option<CheetahString>,
1137        consumer_group: Option<CheetahString>,
1138    ) {
1139        let broker_addr_table = self.broker_addr_table.read().await;
1140        for (broker_name, broker_addrs) in broker_addr_table.iter() {
1141            for (id, addr) in broker_addrs.iter() {
1142                if let Err(err) = self
1143                    .mq_client_api_impl
1144                    .as_mut()
1145                    .unwrap()
1146                    .unregister_client(
1147                        addr,
1148                        self.client_id.clone(),
1149                        producer_group.clone(),
1150                        consumer_group.clone(),
1151                        self.client_config.mq_client_api_timeout,
1152                    )
1153                    .await
1154                {
1155                } else {
1156                    info!(
1157                        "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \
1158                         success",
1159                        producer_group, consumer_group, broker_name, id, addr,
1160                    );
1161                }
1162            }
1163        }
1164    }
1165
1166    async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
1167        let topic_route_table = self.topic_route_table.read().await;
1168        for (_, value) in topic_route_table.iter() {
1169            for bd in value.broker_datas.iter() {
1170                for (_, value) in bd.broker_addrs().iter() {
1171                    if value.as_str() == addr {
1172                        return true;
1173                    }
1174                }
1175            }
1176        }
1177        false
1178    }
1179
1180    /// Queries the assignment for a given topic.
1181    ///
1182    /// This function attempts to find the broker address for the specified topic. If the broker
1183    /// address is not found, it updates the topic route information from the name server and
1184    /// retries. If the broker address is found, it queries the assignment from the broker.
1185    ///
1186    /// # Arguments
1187    ///
1188    /// * `topic` - A reference to a `CheetahString` representing the topic to query.
1189    /// * `consumer_group` - A reference to a `CheetahString` representing the consumer group.
1190    /// * `strategy_name` - A reference to a `CheetahString` representing the allocation strategy
1191    ///   name.
1192    /// * `message_model` - The message model to use for the query.
1193    /// * `timeout` - The timeout duration for the query.
1194    ///
1195    /// # Returns
1196    ///
1197    /// A `Result` containing an `Option` with a `HashSet` of `MessageQueueAssignment` if the query
1198    /// is successful, or an error if it fails.
1199    pub async fn query_assignment(
1200        &mut self,
1201        topic: &CheetahString,
1202        consumer_group: &CheetahString,
1203        strategy_name: &CheetahString,
1204        message_model: MessageModel,
1205        timeout: u64,
1206    ) -> rocketmq_error::RocketMQResult<Option<HashSet<MessageQueueAssignment>>> {
1207        // Try to find broker address
1208        let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
1209
1210        // If not found, update and retry
1211        if broker_addr.is_none() {
1212            self.update_topic_route_info_from_name_server_topic(topic)
1213                .await;
1214            broker_addr = self.find_broker_addr_by_topic(topic).await;
1215        }
1216        if let Some(broker_addr) = broker_addr {
1217            let client_id = self.client_id.clone();
1218            match self.mq_client_api_impl.as_mut() {
1219                Some(api_impl) => {
1220                    api_impl
1221                        .query_assignment(
1222                            &broker_addr,
1223                            topic.clone(),
1224                            consumer_group.clone(),
1225                            client_id,
1226                            strategy_name.clone(),
1227                            message_model,
1228                            timeout,
1229                        )
1230                        .await
1231                }
1232                None => mq_client_err!("mq_client_api_impl is None"),
1233            }
1234        } else {
1235            Ok(None)
1236        }
1237    }
1238
1239    pub async fn consume_message_directly(
1240        &self,
1241        message: MessageExt,
1242        consumer_group: &CheetahString,
1243        broker_name: Option<CheetahString>,
1244    ) -> Option<ConsumeMessageDirectlyResult> {
1245        let consumer_table = self.consumer_table.read().await;
1246        let consumer_inner = consumer_table.get(consumer_group);
1247        if let Some(consumer) = consumer_inner {
1248            consumer
1249                .consume_message_directly(message, broker_name)
1250                .await;
1251        }
1252
1253        None
1254    }
1255}
1256
1257pub fn topic_route_data2topic_publish_info(
1258    topic: &str,
1259    route: &mut TopicRouteData,
1260) -> TopicPublishInfo {
1261    let mut info = TopicPublishInfo {
1262        topic_route_data: Some(route.clone()),
1263        ..Default::default()
1264    };
1265    if route.order_topic_conf.is_some() && !route.order_topic_conf.as_ref().unwrap().is_empty() {
1266        let brokers = route
1267            .order_topic_conf
1268            .as_ref()
1269            .unwrap()
1270            .split(";")
1271            .map(|s| s.to_string())
1272            .collect::<Vec<String>>();
1273        for broker in brokers {
1274            let item = broker.split(":").collect::<Vec<&str>>();
1275            if item.len() == 2 {
1276                let queue_num = item[1].parse::<i32>().unwrap();
1277                for i in 0..queue_num {
1278                    let mq = MessageQueue::from_parts(topic, item[0], i);
1279                    info.message_queue_list.push(mq);
1280                }
1281            }
1282        }
1283        info.order_topic = true;
1284    } else if route.order_topic_conf.is_none()
1285        && route.topic_queue_mapping_by_broker.is_some()
1286        && !route
1287            .topic_queue_mapping_by_broker
1288            .as_ref()
1289            .unwrap()
1290            .is_empty()
1291    {
1292        info.order_topic = false;
1293        let mq_end_points =
1294            ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1295        if let Some(mq_end_points) = mq_end_points {
1296            for (mq, broker_name) in mq_end_points {
1297                info.message_queue_list.push(mq);
1298            }
1299        }
1300        info.message_queue_list
1301            .sort_by(|a, b| match a.get_queue_id().cmp(&b.get_queue_id()) {
1302                Ordering::Less => std::cmp::Ordering::Less,
1303                Ordering::Equal => std::cmp::Ordering::Equal,
1304                Ordering::Greater => std::cmp::Ordering::Greater,
1305            });
1306    } else {
1307        route.queue_datas.sort();
1308        for queue_data in route.queue_datas.iter() {
1309            if PermName::is_writeable(queue_data.perm) {
1310                let mut broker_data = None;
1311                for bd in route.broker_datas.iter() {
1312                    if bd.broker_name() == queue_data.broker_name.as_str() {
1313                        broker_data = Some(bd.clone());
1314                        break;
1315                    }
1316                }
1317                if broker_data.is_none() {
1318                    continue;
1319                }
1320                if !broker_data
1321                    .as_ref()
1322                    .unwrap()
1323                    .broker_addrs()
1324                    .contains_key(&(mix_all::MASTER_ID))
1325                {
1326                    continue;
1327                }
1328                for i in 0..queue_data.write_queue_nums {
1329                    let mq =
1330                        MessageQueue::from_parts(topic, queue_data.broker_name.as_str(), i as i32);
1331                    info.message_queue_list.push(mq);
1332                }
1333            }
1334        }
1335    }
1336    info
1337}
1338
1339pub fn topic_route_data2topic_subscribe_info(
1340    topic: &str,
1341    route: &TopicRouteData,
1342) -> HashSet<MessageQueue> {
1343    if let Some(ref topic_queue_mapping_by_broker) = route.topic_queue_mapping_by_broker {
1344        if !topic_queue_mapping_by_broker.is_empty() {
1345            let mq_endpoints =
1346                ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1347            return mq_endpoints
1348                .unwrap_or_default()
1349                .keys()
1350                .cloned()
1351                .collect::<HashSet<MessageQueue>>();
1352        }
1353    }
1354    let mut mq_list = HashSet::new();
1355    for qd in &route.queue_datas {
1356        if PermName::is_readable(qd.perm) {
1357            for i in 0..qd.read_queue_nums {
1358                let mq = MessageQueue::from_parts(topic, qd.broker_name.as_str(), i as i32);
1359                mq_list.insert(mq);
1360            }
1361        }
1362    }
1363    mq_list
1364}