rocketmq_client_rust/factory/
mq_client_instance.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::sync::atomic::AtomicI64;
21use std::sync::Arc;
22use std::thread;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26use rand::seq::IndexedRandom;
27use rocketmq_common::common::base::service_state::ServiceState;
28use rocketmq_common::common::constant::PermName;
29use rocketmq_common::common::filter::expression_type::ExpressionType;
30use rocketmq_common::common::message::message_ext::MessageExt;
31use rocketmq_common::common::message::message_queue::MessageQueue;
32use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
33use rocketmq_common::common::mix_all;
34use rocketmq_common::TimeUtils::get_current_millis;
35use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
36use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
37use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
38use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
39use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
40use rocketmq_remoting::protocol::heartbeat::producer_data::ProducerData;
41use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
42use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
43use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
44use rocketmq_remoting::runtime::RPCHook;
45use rocketmq_runtime::RocketMQRuntime;
46use rocketmq_rust::ArcMut;
47use rocketmq_rust::RocketMQTokioMutex;
48use tokio::runtime::Handle;
49use tokio::sync::RwLock;
50use tracing::error;
51use tracing::info;
52use tracing::warn;
53
54use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
55use crate::base::client_config::ClientConfig;
56use crate::consumer::consumer_impl::pull_message_service::PullMessageService;
57use crate::consumer::consumer_impl::re_balance::rebalance_service::RebalanceService;
58use crate::consumer::mq_consumer_inner::MQConsumerInner;
59use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
60use crate::implementation::client_remoting_processor::ClientRemotingProcessor;
61use crate::implementation::find_broker_result::FindBrokerResult;
62use crate::implementation::mq_admin_impl::MQAdminImpl;
63use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
64use crate::producer::default_mq_producer::DefaultMQProducer;
65use crate::producer::default_mq_producer::ProducerConfig;
66use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
67use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
68
69const LOCK_TIMEOUT_MILLIS: u64 = 3000;
70
71pub struct MQClientInstance {
72    pub(crate) client_config: ArcMut<ClientConfig>,
73    pub(crate) client_id: CheetahString,
74    boot_timestamp: u64,
75    /**
76     * The container of the producer in the current client. The key is the name of
77     * producerGroup.
78     */
79    producer_table: Arc<RwLock<HashMap<CheetahString, MQProducerInnerImpl>>>,
80    /**
81     * The container of the consumer in the current client. The key is the name of
82     * consumer_group.
83     */
84    consumer_table: Arc<RwLock<HashMap<CheetahString, MQConsumerInnerImpl>>>,
85    /**
86     * The container of the adminExt in the current client. The key is the name of
87     * adminExtGroup.
88     */
89    admin_ext_table: Arc<RwLock<HashMap<CheetahString, MQAdminExtInnerImpl>>>,
90    pub(crate) mq_client_api_impl: Option<ArcMut<MQClientAPIImpl>>,
91    pub(crate) mq_admin_impl: ArcMut<MQAdminImpl>,
92    pub(crate) topic_route_table: Arc<RwLock<HashMap<CheetahString /* Topic */, TopicRouteData>>>,
93    topic_end_points_table: Arc<
94        RwLock<
95            HashMap<
96                CheetahString, /* Topic */
97                HashMap<MessageQueue, CheetahString /* brokerName */>,
98            >,
99        >,
100    >,
101    lock_namesrv: Arc<RocketMQTokioMutex<()>>,
102    lock_heartbeat: Arc<RocketMQTokioMutex<()>>,
103
104    service_state: ServiceState,
105    pub(crate) pull_message_service: ArcMut<PullMessageService>,
106    rebalance_service: RebalanceService,
107    pub(crate) default_producer: ArcMut<DefaultMQProducer>,
108    instance_runtime: Arc<RocketMQRuntime>,
109    broker_addr_table: Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>>,
110    broker_version_table: Arc<
111        RwLock<
112            HashMap<
113                CheetahString, /* Broker Name */
114                HashMap<CheetahString /* address */, i32>,
115            >,
116        >,
117    >,
118    send_heartbeat_times_total: Arc<AtomicI64>,
119}
120
121impl MQClientInstance {
122    pub fn new(
123        client_config: ClientConfig,
124        instance_index: i32,
125        client_id: String,
126        rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
127    ) -> Self {
128        /* let broker_addr_table = Arc::new(Default::default());
129        let (tx, _) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
130        let rx = tx.subscribe();
131        let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
132            Arc::new(TokioClientConfig::default()),
133            ClientRemotingProcessor::new(),
134            rpc_hook,
135            client_config.clone(),
136            Some(tx),
137        ));
138        if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
139            let handle = Handle::current();
140            let mq_client_api_impl_cloned = mq_client_api_impl.clone();
141            let namesrv_addr = namesrv_addr.to_string();
142            thread::spawn(move || {
143                handle.block_on(async move {
144                    mq_client_api_impl_cloned
145                        .update_name_server_address_list(namesrv_addr.as_str())
146                        .await;
147                })
148            });
149        }
150        let instance = MQClientInstance {
151            client_config: Arc::new(client_config.clone()),
152            client_id,
153            boot_timestamp: get_current_millis(),
154            producer_table: Arc::new(RwLock::new(HashMap::new())),
155            consumer_table: Arc::new(Default::default()),
156            admin_ext_table: Arc::new(Default::default()),
157            mq_client_api_impl:None,
158            mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
159            topic_route_table: Arc::new(Default::default()),
160            topic_end_points_table: Arc::new(Default::default()),
161            lock_namesrv: Default::default(),
162            lock_heartbeat: Default::default(),
163            service_state: ServiceState::CreateJust,
164            pull_message_service: ArcMut::new(PullMessageService::new()),
165            rebalance_service: RebalanceService::new(),
166            default_producer: ArcMut::new(
167                DefaultMQProducer::builder()
168                    .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
169                    .client_config(client_config.clone())
170                    .build(),
171            ),
172            instance_runtime: Arc::new(RocketMQRuntime::new_multi(
173                num_cpus::get(),
174                "mq-client-instance",
175            )),
176            broker_addr_table,
177            broker_version_table: Arc::new(Default::default()),
178            send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
179            tx: Some(rx),
180        };
181        // let instance_ = instance.clone();
182
183        instance*/
184        unimplemented!()
185    }
186
187    pub fn new_arc(
188        client_config: ClientConfig,
189        instance_index: i32,
190        client_id: impl Into<CheetahString>,
191        rpc_hook: Option<Arc<dyn RPCHook>>,
192    ) -> ArcMut<MQClientInstance> {
193        let broker_addr_table = Arc::new(Default::default());
194        let mut instance = ArcMut::new(MQClientInstance {
195            client_config: ArcMut::new(client_config.clone()),
196            client_id: client_id.into(),
197            boot_timestamp: get_current_millis(),
198            producer_table: Arc::new(RwLock::new(HashMap::new())),
199            consumer_table: Arc::new(Default::default()),
200            admin_ext_table: Arc::new(Default::default()),
201            mq_client_api_impl: None,
202            mq_admin_impl: ArcMut::new(MQAdminImpl::new()),
203            topic_route_table: Arc::new(Default::default()),
204            topic_end_points_table: Arc::new(Default::default()),
205            lock_namesrv: Default::default(),
206            lock_heartbeat: Default::default(),
207            service_state: ServiceState::CreateJust,
208            pull_message_service: ArcMut::new(PullMessageService::new()),
209            rebalance_service: RebalanceService::new(),
210            default_producer: ArcMut::new(
211                DefaultMQProducer::builder()
212                    .producer_group(mix_all::CLIENT_INNER_PRODUCER_GROUP)
213                    .client_config(client_config.clone())
214                    .build(),
215            ),
216            instance_runtime: Arc::new(RocketMQRuntime::new_multi(
217                num_cpus::get(),
218                "mq-client-instance",
219            )),
220            broker_addr_table,
221            broker_version_table: Arc::new(Default::default()),
222            send_heartbeat_times_total: Arc::new(AtomicI64::new(0)),
223        });
224        let instance_clone = instance.clone();
225        instance.mq_admin_impl.set_client(instance_clone);
226        let weak_instance = ArcMut::downgrade(&instance);
227        let (tx, mut rx) = tokio::sync::broadcast::channel::<ConnectionNetEvent>(16);
228
229        let mq_client_api_impl = ArcMut::new(MQClientAPIImpl::new(
230            Arc::new(TokioClientConfig::default()),
231            ClientRemotingProcessor::new(instance.clone()),
232            rpc_hook,
233            client_config.clone(),
234            Some(tx),
235        ));
236        instance.mq_client_api_impl = Some(mq_client_api_impl.clone());
237        if let Some(namesrv_addr) = client_config.namesrv_addr.as_deref() {
238            let handle = Handle::current();
239
240            let namesrv_addr = namesrv_addr.to_string();
241            thread::spawn(move || {
242                handle.block_on(async move {
243                    mq_client_api_impl
244                        .update_name_server_address_list(namesrv_addr.as_str())
245                        .await;
246                })
247            });
248        }
249        tokio::spawn(async move {
250            while let Ok(value) = rx.recv().await {
251                if let Some(instance_) = weak_instance.upgrade() {
252                    match value {
253                        ConnectionNetEvent::CONNECTED(remote_address) => {
254                            info!("ConnectionNetEvent CONNECTED");
255                            let broker_addr_table = instance_.broker_addr_table.read().await;
256                            for (broker_name, broker_addrs) in broker_addr_table.iter() {
257                                for (id, addr) in broker_addrs.iter() {
258                                    if addr == remote_address.to_string().as_str()
259                                        && instance_
260                                            .send_heartbeat_to_broker(*id, broker_name, addr)
261                                            .await
262                                    {
263                                        instance_.re_balance_immediately();
264                                    }
265                                }
266                            }
267                        }
268                        ConnectionNetEvent::DISCONNECTED => {}
269                        ConnectionNetEvent::EXCEPTION => {}
270                        ConnectionNetEvent::IDLE => {}
271                    }
272                }
273            }
274            warn!("ConnectionNetEvent recv error");
275        });
276        instance
277    }
278
279    pub fn re_balance_immediately(&self) {
280        self.rebalance_service.wakeup();
281    }
282
283    pub fn re_balance_later(&self, delay_millis: Duration) {
284        if delay_millis <= Duration::from_millis(0) {
285            self.rebalance_service.wakeup();
286        } else {
287            let service = self.rebalance_service.clone();
288            tokio::spawn(async move {
289                tokio::time::sleep(delay_millis).await;
290                service.wakeup();
291            });
292        }
293    }
294
295    pub async fn start(&mut self, this: ArcMut<Self>) -> rocketmq_error::RocketMQResult<()> {
296        match self.service_state {
297            ServiceState::CreateJust => {
298                self.service_state = ServiceState::StartFailed;
299                // If not specified,looking address from name remoting_server
300                if self.client_config.namesrv_addr.is_none() {
301                    self.mq_client_api_impl
302                        .as_mut()
303                        .expect("mq_client_api_impl is None")
304                        .fetch_name_server_addr()
305                        .await;
306                }
307                // Start request-response channel
308                self.mq_client_api_impl
309                    .as_mut()
310                    .expect("mq_client_api_impl is None")
311                    .start()
312                    .await;
313                // Start various schedule tasks
314                self.start_scheduled_task(this.clone());
315                // Start pull service
316                let instance = this.clone();
317                self.pull_message_service.start(instance).await;
318                // Start rebalance service
319                self.rebalance_service.start(this).await;
320                // Start push service
321
322                self.default_producer
323                    .default_mqproducer_impl
324                    .as_mut()
325                    .unwrap()
326                    .start_with_factory(false)
327                    .await?;
328                info!("the client factory[{}] start OK", self.client_id);
329                self.service_state = ServiceState::Running;
330            }
331            ServiceState::Running => {}
332            ServiceState::ShutdownAlready => {}
333            ServiceState::StartFailed => {
334                return Err(mq_client_err!(format!(
335                    "The Factory object[{}] has been created before, and failed.",
336                    self.client_id
337                )));
338            }
339        }
340        Ok(())
341    }
342
343    pub async fn shutdown(&mut self) {}
344
345    pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
346        if group.is_empty() {
347            return false;
348        }
349        let mut producer_table = self.producer_table.write().await;
350        if producer_table.contains_key(group) {
351            warn!("the producer group[{}] exist already.", group);
352            return false;
353        }
354        producer_table.insert(group.into(), producer);
355        true
356    }
357
358    pub async fn register_admin_ext(&mut self, group: &str, admin: MQAdminExtInnerImpl) -> bool {
359        if group.is_empty() {
360            return false;
361        }
362        let mut admin_ext_table = self.admin_ext_table.write().await;
363        if admin_ext_table.contains_key(group) {
364            warn!("the admin group[{}] exist already.", group);
365            return false;
366        }
367        admin_ext_table.insert(group.into(), admin);
368        true
369    }
370
371    fn start_scheduled_task(&mut self, this: ArcMut<Self>) {
372        if self.client_config.namesrv_addr.is_none() {
373            // Fetch name server address
374            let mut mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
375            self.instance_runtime.get_handle().spawn(async move {
376                info!("ScheduledTask fetchNameServerAddr started");
377                tokio::time::sleep(Duration::from_secs(10)).await;
378                loop {
379                    let current_execution_time = tokio::time::Instant::now();
380                    mq_client_api_impl.fetch_name_server_addr().await;
381                    let next_execution_time = current_execution_time + Duration::from_secs(120);
382                    let delay =
383                        next_execution_time.saturating_duration_since(tokio::time::Instant::now());
384                    tokio::time::sleep(delay).await;
385                }
386            });
387        }
388
389        // Update topic route info from name server
390        let mut client_instance = this.clone();
391        let poll_name_server_interval = self.client_config.poll_name_server_interval;
392        self.instance_runtime.get_handle().spawn(async move {
393            info!("ScheduledTask update_topic_route_info_from_name_server started");
394            tokio::time::sleep(Duration::from_millis(10)).await;
395            loop {
396                let current_execution_time = tokio::time::Instant::now();
397                client_instance
398                    .update_topic_route_info_from_name_server()
399                    .await;
400                let next_execution_time = current_execution_time
401                    + Duration::from_millis(poll_name_server_interval as u64);
402                let delay =
403                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
404                tokio::time::sleep(delay).await;
405            }
406        });
407
408        // Clean offline broker and send heartbeat to all broker
409        let mut client_instance = this.clone();
410        let heartbeat_broker_interval = self.client_config.heartbeat_broker_interval;
411        self.instance_runtime.get_handle().spawn(async move {
412            info!("ScheduledTask clean_offline_broker started");
413            tokio::time::sleep(Duration::from_secs(1)).await;
414            loop {
415                let current_execution_time = tokio::time::Instant::now();
416                client_instance.clean_offline_broker().await;
417                client_instance
418                    .send_heartbeat_to_all_broker_with_lock()
419                    .await;
420                let next_execution_time = current_execution_time
421                    + Duration::from_millis(heartbeat_broker_interval as u64);
422                let delay =
423                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
424                tokio::time::sleep(delay).await;
425            }
426        });
427
428        // Persist all consumer offset
429        let mut client_instance = this;
430        let persist_consumer_offset_interval =
431            self.client_config.persist_consumer_offset_interval as u64;
432        self.instance_runtime.get_handle().spawn(async move {
433            info!("ScheduledTask persistAllConsumerOffset started");
434            tokio::time::sleep(Duration::from_secs(10)).await;
435            loop {
436                let current_execution_time = tokio::time::Instant::now();
437                client_instance.persist_all_consumer_offset().await;
438                let next_execution_time = current_execution_time
439                    + Duration::from_millis(persist_consumer_offset_interval);
440                let delay =
441                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
442                tokio::time::sleep(delay).await;
443            }
444        });
445    }
446
447    pub async fn update_topic_route_info_from_name_server(&mut self) {
448        let mut topic_list = HashSet::new();
449
450        {
451            let producer_table = self.producer_table.read().await;
452            for (_, value) in producer_table.iter() {
453                topic_list.extend(value.get_publish_topic_list());
454            }
455        }
456
457        {
458            let consumer_table = self.consumer_table.read().await;
459            for (_, value) in consumer_table.iter() {
460                value.subscriptions().iter().for_each(|sub| {
461                    topic_list.insert(sub.topic.clone());
462                });
463            }
464        }
465
466        for topic in topic_list.iter() {
467            self.update_topic_route_info_from_name_server_topic(topic)
468                .await;
469        }
470    }
471
472    #[inline]
473    pub async fn update_topic_route_info_from_name_server_topic(
474        &mut self,
475        topic: &CheetahString,
476    ) -> bool {
477        self.update_topic_route_info_from_name_server_default(topic, false, None)
478            .await
479    }
480
481    pub async fn find_consumer_id_list(
482        &mut self,
483        topic: &CheetahString,
484        group: &CheetahString,
485    ) -> Option<Vec<CheetahString>> {
486        let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
487        if broker_addr.is_none() {
488            self.update_topic_route_info_from_name_server_topic(topic)
489                .await;
490            broker_addr = self.find_broker_addr_by_topic(topic).await;
491        }
492        if let Some(broker_addr) = broker_addr {
493            match self
494                .mq_client_api_impl
495                .as_mut()
496                .unwrap()
497                .get_consumer_id_list_by_group(
498                    broker_addr.as_str(),
499                    group,
500                    self.client_config.mq_client_api_timeout,
501                )
502                .await
503            {
504                Ok(value) => return Some(value),
505                Err(e) => {
506                    warn!(
507                        "getConsumerIdListByGroup exception,{}  {}, err:{}",
508                        broker_addr,
509                        group,
510                        e.to_string()
511                    );
512                }
513            }
514        }
515        None
516    }
517
518    pub async fn find_broker_addr_by_topic(&self, topic: &str) -> Option<CheetahString> {
519        let topic_route_table = self.topic_route_table.read().await;
520        if let Some(topic_route_data) = topic_route_table.get(topic) {
521            let brokers = &topic_route_data.broker_datas;
522            if !brokers.is_empty() {
523                let bd = brokers.choose(&mut rand::rng());
524                if let Some(bd) = bd {
525                    return bd.select_broker_addr();
526                }
527            }
528        }
529        None
530    }
531
532    pub async fn update_topic_route_info_from_name_server_default(
533        &mut self,
534        topic: &CheetahString,
535        is_default: bool,
536        producer_config: Option<&Arc<ProducerConfig>>,
537    ) -> bool {
538        let lock = self.lock_namesrv.lock().await;
539        let topic_route_data = if let (true, Some(producer_config)) = (is_default, producer_config)
540        {
541            let mut result = match self
542                .mq_client_api_impl
543                .as_mut()
544                .unwrap()
545                .get_default_topic_route_info_from_name_server(
546                    self.client_config.mq_client_api_timeout,
547                )
548                .await
549            {
550                Ok(value) => value,
551                Err(e) => {
552                    error!(
553                        "get_default_topic_route_info_from_name_server failed, topic: {}, error: \
554                         {}",
555                        topic, e
556                    );
557                    None
558                }
559            };
560            if let Some(topic_route_data) = result.as_mut() {
561                for data in topic_route_data.queue_datas.iter_mut() {
562                    let queue_nums = producer_config
563                        .default_topic_queue_nums()
564                        .max(data.read_queue_nums);
565                    data.read_queue_nums = queue_nums;
566                    data.write_queue_nums = queue_nums;
567                }
568            }
569            result
570        } else {
571            self.mq_client_api_impl
572                .as_mut()
573                .unwrap()
574                .get_topic_route_info_from_name_server(
575                    topic,
576                    self.client_config.mq_client_api_timeout,
577                )
578                .await
579                .unwrap_or(None)
580        };
581        if let Some(mut topic_route_data) = topic_route_data {
582            let mut topic_route_table = self.topic_route_table.write().await;
583            let old = topic_route_table.get(topic);
584            let mut changed = topic_route_data.topic_route_data_changed(old);
585            if !changed {
586                changed = self.is_need_update_topic_route_info(topic).await;
587            } else {
588                info!(
589                    "the topic[{}] route info changed, old[{:?}] ,new[{:?}]",
590                    topic, old, topic_route_data
591                )
592            }
593            if changed {
594                let mut broker_addr_table = self.broker_addr_table.write().await;
595                for bd in topic_route_data.broker_datas.iter() {
596                    broker_addr_table.insert(bd.broker_name().clone(), bd.broker_addrs().clone());
597                }
598                drop(broker_addr_table);
599
600                // Update endpoint map
601                {
602                    let mq_end_points = ClientMetadata::topic_route_data2endpoints_for_static_topic(
603                        topic,
604                        &topic_route_data,
605                    );
606                    if let Some(mq_end_points) = mq_end_points {
607                        if !mq_end_points.is_empty() {
608                            let mut topic_end_points_table =
609                                self.topic_end_points_table.write().await;
610                            topic_end_points_table.insert(topic.into(), mq_end_points);
611                        }
612                    }
613                }
614
615                // Update Pub info
616                {
617                    let mut publish_info =
618                        topic_route_data2topic_publish_info(topic, &mut topic_route_data);
619                    publish_info.have_topic_router_info = true;
620                    let mut producer_table = self.producer_table.write().await;
621                    for (_, value) in producer_table.iter_mut() {
622                        value.update_topic_publish_info(
623                            topic.to_string(),
624                            Some(publish_info.clone()),
625                        );
626                    }
627                }
628
629                // Update sub info
630                {
631                    let consumer_table = self.consumer_table.read().await;
632                    if !consumer_table.is_empty() {
633                        let subscribe_info =
634                            topic_route_data2topic_subscribe_info(topic, &topic_route_data);
635                        for (_, value) in consumer_table.iter() {
636                            value
637                                .update_topic_subscribe_info(topic.clone(), &subscribe_info)
638                                .await;
639                        }
640                    }
641                }
642                let clone_topic_route_data = TopicRouteData::from_existing(&topic_route_data);
643                topic_route_table.insert(topic.clone(), clone_topic_route_data);
644                return true;
645            }
646        } else {
647            warn!(
648                "updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, \
649                 Topic: {}. [{}]",
650                topic, self.client_id
651            );
652        }
653
654        drop(lock);
655        false
656    }
657
658    async fn is_need_update_topic_route_info(&self, topic: &CheetahString) -> bool {
659        let mut result = false;
660        let producer_table = self.producer_table.read().await;
661        for (key, value) in producer_table.iter() {
662            if !result {
663                result = value.is_publish_topic_need_update(topic);
664                break;
665            }
666        }
667        if result {
668            return true;
669        }
670
671        let consumer_table = self.consumer_table.read().await;
672        for (key, value) in consumer_table.iter() {
673            if !result {
674                result = value.is_subscribe_topic_need_update(topic).await;
675                break;
676            }
677        }
678        result
679    }
680
681    pub async fn persist_all_consumer_offset(&mut self) {
682        let consumer_table = self.consumer_table.read().await;
683        for (_, value) in consumer_table.iter() {
684            value.persist_consumer_offset().await;
685        }
686    }
687
688    pub async fn clean_offline_broker(&mut self) {
689        let lock = self
690            .lock_namesrv
691            .try_lock_timeout(Duration::from_millis(LOCK_TIMEOUT_MILLIS))
692            .await;
693        if let Some(lock) = lock {
694            let mut broker_addr_table = self.broker_addr_table.write().await;
695            let mut updated_table = HashMap::with_capacity(broker_addr_table.len());
696            let mut broker_name_set = HashSet::new();
697            for (broker_name, one_table) in broker_addr_table.iter() {
698                let mut clone_addr_table = one_table.clone();
699                let mut remove_id_set = HashSet::new();
700                for (id, addr) in one_table.iter() {
701                    if !self.is_broker_addr_exist_in_topic_route_table(addr).await {
702                        remove_id_set.insert(*id);
703                    }
704                }
705                clone_addr_table.retain(|k, _| !remove_id_set.contains(k));
706                if clone_addr_table.is_empty() {
707                    info!(
708                        "the broker[{}] name's host is offline, remove it",
709                        broker_name
710                    );
711                    broker_name_set.insert(broker_name.clone());
712                } else {
713                    updated_table.insert(broker_name.clone(), clone_addr_table);
714                }
715            }
716            broker_addr_table.retain(|k, _| !broker_name_set.contains(k));
717            if !updated_table.is_empty() {
718                broker_addr_table.extend(updated_table);
719            }
720        }
721    }
722    pub async fn send_heartbeat_to_all_broker_with_lock(&mut self) -> bool {
723        if let Some(lock) = self.lock_heartbeat.try_lock().await {
724            if self.client_config.use_heartbeat_v2 {
725                self.send_heartbeat_to_all_broker_v2(false).await
726            } else {
727                self.send_heartbeat_to_all_broker().await
728            }
729        } else {
730            warn!("lock heartBeat, but failed. [{}]", self.client_id);
731            false
732        }
733    }
734
735    pub async fn send_heartbeat_to_all_broker_with_lock_v2(&mut self, is_rebalance: bool) -> bool {
736        if let Some(lock) = self
737            .lock_heartbeat
738            .try_lock_timeout(Duration::from_secs(2))
739            .await
740        {
741            if self.client_config.use_heartbeat_v2 {
742                self.send_heartbeat_to_all_broker_v2(is_rebalance).await
743            } else {
744                self.send_heartbeat_to_all_broker().await
745            }
746        } else {
747            warn!("lock heartBeat, but failed. [{}]", self.client_id);
748            false
749        }
750    }
751
752    pub fn get_mq_client_api_impl(&self) -> ArcMut<MQClientAPIImpl> {
753        self.mq_client_api_impl.as_ref().unwrap().clone()
754    }
755
756    pub async fn get_broker_name_from_message_queue(
757        &self,
758        message_queue: &MessageQueue,
759    ) -> CheetahString {
760        let guard = self.topic_end_points_table.read().await;
761        if let Some(broker_name) = guard.get(message_queue.get_topic()) {
762            if let Some(addr) = broker_name.get(message_queue) {
763                return addr.clone();
764            }
765        }
766        message_queue.get_broker_name().clone()
767    }
768
769    pub async fn find_broker_address_in_publish(
770        &self,
771        broker_name: &CheetahString,
772    ) -> Option<CheetahString> {
773        if broker_name.is_empty() {
774            return None;
775        }
776        let guard = self.broker_addr_table.read().await;
777        let map = guard.get(broker_name);
778        if let Some(map) = map {
779            return map.get(&(mix_all::MASTER_ID)).cloned();
780        }
781        None
782    }
783
784    async fn send_heartbeat_to_all_broker_v2(&self, is_rebalance: bool) -> bool {
785        unimplemented!()
786    }
787
788    async fn send_heartbeat_to_all_broker(&self) -> bool {
789        let heartbeat_data = self.prepare_heartbeat_data(false).await;
790        let producer_empty = heartbeat_data.producer_data_set.is_empty();
791        let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
792        if producer_empty && consumer_empty {
793            warn!(
794                "sending heartbeat, but no consumer and no producer. [{}]",
795                self.client_id
796            );
797            return false;
798        }
799        let broker_addr_table = self.broker_addr_table.read().await;
800        if broker_addr_table.is_empty() {
801            return false;
802        }
803        for (broker_name, broker_addrs) in broker_addr_table.iter() {
804            if broker_addrs.is_empty() {
805                continue;
806            }
807            for (id, addr) in broker_addrs.iter() {
808                if addr.is_empty() {
809                    continue;
810                }
811                if consumer_empty && *id != mix_all::MASTER_ID {
812                    continue;
813                }
814                self.send_heartbeat_to_broker_inner(*id, broker_name, addr, &heartbeat_data)
815                    .await;
816            }
817        }
818
819        true
820    }
821
822    pub async fn send_heartbeat_to_broker(
823        &self,
824        id: u64,
825        broker_name: &CheetahString,
826        addr: &CheetahString,
827    ) -> bool {
828        if let Some(lock) = self.lock_heartbeat.try_lock().await {
829            let heartbeat_data = self.prepare_heartbeat_data(false).await;
830            let producer_empty = heartbeat_data.producer_data_set.is_empty();
831            let consumer_empty = heartbeat_data.consumer_data_set.is_empty();
832            if producer_empty && consumer_empty {
833                warn!(
834                    "sending heartbeat, but no consumer and no producer. [{}]",
835                    self.client_id
836                );
837                return false;
838            }
839
840            if self.client_config.use_heartbeat_v2 {
841                unimplemented!("sendHeartbeatToBrokerV2")
842            } else {
843                self.send_heartbeat_to_broker_inner(id, broker_name, addr, &heartbeat_data)
844                    .await
845            }
846        } else {
847            false
848        }
849    }
850
851    async fn send_heartbeat_to_broker_inner(
852        &self,
853        id: u64,
854        broker_name: &CheetahString,
855        addr: &CheetahString,
856        heartbeat_data: &HeartbeatData,
857    ) -> bool {
858        if let Ok(version) = self
859            .mq_client_api_impl
860            .as_ref()
861            .unwrap()
862            .mut_from_ref()
863            .send_heartbeat(
864                addr,
865                heartbeat_data,
866                self.client_config.mq_client_api_timeout,
867            )
868            .await
869        {
870            let mut broker_version_table = self.broker_version_table.write().await;
871            let map = broker_version_table.get_mut(broker_name);
872            if let Some(map) = map {
873                map.insert(addr.clone(), version);
874            } else {
875                let mut map = HashMap::new();
876                map.insert(addr.clone(), version);
877                broker_version_table.insert(broker_name.clone(), map);
878            }
879
880            let times = self
881                .send_heartbeat_times_total
882                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
883            if times % 20 == 0 {
884                info!(
885                    "send heart beat to broker[{} {} {}] success",
886                    broker_name, id, addr,
887                );
888            }
889            return true;
890        }
891        if self.is_broker_in_name_server(addr).await {
892            warn!(
893                "send heart beat to broker[{} {} {}] failed",
894                broker_name, id, addr
895            );
896        } else {
897            warn!(
898                "send heart beat to broker[{} {} {}] exception, because the broker not up, forget \
899                 it",
900                broker_name, id, addr
901            )
902        }
903        false
904    }
905
906    async fn is_broker_in_name_server(&self, broker_name: &str) -> bool {
907        let broker_addr_table = self.topic_route_table.read().await;
908        for (_, value) in broker_addr_table.iter() {
909            for bd in value.broker_datas.iter() {
910                for (_, value) in bd.broker_addrs().iter() {
911                    if value.as_str() == broker_name {
912                        return true;
913                    }
914                }
915            }
916        }
917        false
918    }
919
920    async fn prepare_heartbeat_data(&self, is_without_sub: bool) -> HeartbeatData {
921        let mut heartbeat_data = HeartbeatData {
922            client_id: self.client_id.clone(),
923            ..Default::default()
924        };
925
926        let consumer_table = self.consumer_table.read().await;
927        for (_, value) in consumer_table.iter() {
928            let mut consumer_data = ConsumerData {
929                group_name: value.group_name(),
930                consume_type: value.consume_type(),
931                message_model: value.message_model(),
932                consume_from_where: value.consume_from_where(),
933                subscription_data_set: value.subscriptions(),
934                unit_mode: value.is_unit_mode(),
935            };
936            if !is_without_sub {
937                value.subscriptions().iter().for_each(|sub| {
938                    consumer_data.subscription_data_set.insert(sub.clone());
939                });
940            }
941            heartbeat_data.consumer_data_set.insert(consumer_data);
942        }
943        drop(consumer_table);
944        let producer_table = self.producer_table.read().await;
945        for (group_name, _) in producer_table.iter() {
946            let producer_data = ProducerData {
947                group_name: group_name.clone(),
948            };
949            heartbeat_data.producer_data_set.insert(producer_data);
950        }
951        drop(producer_table);
952        heartbeat_data.is_without_sub = is_without_sub;
953        heartbeat_data
954    }
955
956    pub async fn register_consumer(
957        &mut self,
958        group: &CheetahString,
959        consumer: MQConsumerInnerImpl,
960    ) -> bool {
961        let mut consumer_table = self.consumer_table.write().await;
962        if consumer_table.contains_key(group) {
963            warn!("the consumer group[{}] exist already.", group);
964            return false;
965        }
966        consumer_table.insert(group.clone(), consumer);
967        true
968    }
969
970    pub async fn check_client_in_broker(&mut self) -> rocketmq_error::RocketMQResult<()> {
971        let consumer_table = self.consumer_table.read().await;
972        for (key, value) in consumer_table.iter() {
973            let subscription_inner = value.subscriptions();
974            if subscription_inner.is_empty() {
975                return Ok(());
976            }
977            for subscription_data in subscription_inner.iter() {
978                if ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) {
979                    continue;
980                }
981                let addr = self
982                    .find_broker_addr_by_topic(subscription_data.topic.as_str())
983                    .await;
984                if let Some(addr) = addr {
985                    match self
986                        .mq_client_api_impl
987                        .as_mut()
988                        .unwrap()
989                        .check_client_in_broker(
990                            addr.as_str(),
991                            key,
992                            self.client_id.as_str(),
993                            subscription_data,
994                            self.client_config.mq_client_api_timeout,
995                        )
996                        .await
997                    {
998                        Ok(_) => {}
999                        Err(e) => match e {
1000                            rocketmq_error::RocketMQError::IllegalArgument(_) => {
1001                                return Err(e);
1002                            }
1003                            _ => {
1004                                let desc = format!(
1005                                    "Check client in broker error, maybe because you use {} to \
1006                                     filter message, but server has not been upgraded to \
1007                                     support!This error would not affect the launch of consumer, \
1008                                     but may has impact on message receiving if you have use the \
1009                                     new features which are not supported by server, please check \
1010                                     the log!",
1011                                    subscription_data.expression_type
1012                                );
1013                            }
1014                        },
1015                    }
1016                }
1017            }
1018        }
1019
1020        Ok(())
1021    }
1022
1023    pub async fn do_rebalance(&mut self) -> bool {
1024        let mut balanced = true;
1025        let consumer_table = self.consumer_table.read().await;
1026        for (key, value) in consumer_table.iter() {
1027            match value.try_rebalance().await {
1028                Ok(result) => {
1029                    if !result {
1030                        balanced = false;
1031                    }
1032                }
1033                Err(e) => {
1034                    error!(
1035                        "doRebalance for consumer group [{}] exception:{}",
1036                        key,
1037                        e.to_string()
1038                    );
1039                }
1040            }
1041        }
1042        balanced
1043    }
1044
1045    pub fn rebalance_later(&mut self, delay_millis: u64) {
1046        if delay_millis == 0 {
1047            self.rebalance_service.wakeup();
1048        } else {
1049            let service = self.rebalance_service.clone();
1050            self.instance_runtime.get_handle().spawn(async move {
1051                tokio::time::sleep(Duration::from_millis(delay_millis)).await;
1052                service.wakeup();
1053            });
1054        }
1055    }
1056
1057    pub async fn find_broker_address_in_subscribe(
1058        &mut self,
1059        broker_name: &CheetahString,
1060        broker_id: u64,
1061        only_this_broker: bool,
1062    ) -> Option<FindBrokerResult> {
1063        if broker_name.is_empty() {
1064            return None;
1065        }
1066        let broker_addr_table = self.broker_addr_table.read().await;
1067        let map = broker_addr_table.get(broker_name);
1068        let mut broker_addr = None;
1069        let mut slave = false;
1070        let mut found = false;
1071
1072        if let Some(map) = map {
1073            broker_addr = map.get(&broker_id);
1074            slave = broker_id != mix_all::MASTER_ID;
1075            found = broker_addr.is_some();
1076            if !found && slave {
1077                broker_addr = map.get(&(broker_id + 1));
1078                found = broker_addr.is_some();
1079            }
1080            if !found && !only_this_broker {
1081                if let Some((key, value)) = map.iter().next() {
1082                    //broker_addr = Some(value.clone());
1083                    slave = *key != mix_all::MASTER_ID;
1084                    found = !value.is_empty();
1085                }
1086            }
1087        }
1088        if found {
1089            let broker_addr = broker_addr.cloned()?;
1090            let broker_version = self
1091                .find_broker_version(broker_name, broker_addr.as_str())
1092                .await;
1093            Some(FindBrokerResult {
1094                broker_addr,
1095                slave,
1096                broker_version,
1097            })
1098        } else {
1099            None
1100        }
1101    }
1102    async fn find_broker_version(&self, broker_name: &str, broker_addr: &str) -> i32 {
1103        let broker_version_table = self.broker_version_table.read().await;
1104        if let Some(map) = broker_version_table.get(broker_name) {
1105            if let Some(version) = map.get(broker_addr) {
1106                return *version;
1107            }
1108        }
1109        0
1110    }
1111
1112    pub async fn select_consumer(&self, group: &str) -> Option<MQConsumerInnerImpl> {
1113        let consumer_table = self.consumer_table.read().await;
1114        consumer_table.get(group).cloned()
1115    }
1116
1117    pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
1118        let producer_table = self.producer_table.read().await;
1119        producer_table.get(group).cloned()
1120    }
1121
1122    pub async fn unregister_consumer(&mut self, group: impl Into<CheetahString>) {
1123        self.unregister_client(None, Some(group.into())).await;
1124    }
1125    pub async fn unregister_producer(&mut self, group: impl Into<CheetahString>) {
1126        self.unregister_client(Some(group.into()), None).await;
1127    }
1128
1129    pub async fn unregister_admin_ext(&mut self, group: impl Into<CheetahString>) {
1130        let mut write_guard = self.admin_ext_table.write().await;
1131        let _ = write_guard.remove(&group.into());
1132    }
1133    async fn unregister_client(
1134        &mut self,
1135        producer_group: Option<CheetahString>,
1136        consumer_group: Option<CheetahString>,
1137    ) {
1138        let broker_addr_table = self.broker_addr_table.read().await;
1139        for (broker_name, broker_addrs) in broker_addr_table.iter() {
1140            for (id, addr) in broker_addrs.iter() {
1141                if let Err(err) = self
1142                    .mq_client_api_impl
1143                    .as_mut()
1144                    .unwrap()
1145                    .unregister_client(
1146                        addr,
1147                        self.client_id.clone(),
1148                        producer_group.clone(),
1149                        consumer_group.clone(),
1150                        self.client_config.mq_client_api_timeout,
1151                    )
1152                    .await
1153                {
1154                } else {
1155                    info!(
1156                        "unregister client[Producer: {:?} Consumer: {:?}] from broker[{} {} {}] \
1157                         success",
1158                        producer_group, consumer_group, broker_name, id, addr,
1159                    );
1160                }
1161            }
1162        }
1163    }
1164
1165    async fn is_broker_addr_exist_in_topic_route_table(&self, addr: &str) -> bool {
1166        let topic_route_table = self.topic_route_table.read().await;
1167        for (_, value) in topic_route_table.iter() {
1168            for bd in value.broker_datas.iter() {
1169                for (_, value) in bd.broker_addrs().iter() {
1170                    if value.as_str() == addr {
1171                        return true;
1172                    }
1173                }
1174            }
1175        }
1176        false
1177    }
1178
1179    /// Queries the assignment for a given topic.
1180    ///
1181    /// This function attempts to find the broker address for the specified topic. If the broker
1182    /// address is not found, it updates the topic route information from the name server and
1183    /// retries. If the broker address is found, it queries the assignment from the broker.
1184    ///
1185    /// # Arguments
1186    ///
1187    /// * `topic` - A reference to a `CheetahString` representing the topic to query.
1188    /// * `consumer_group` - A reference to a `CheetahString` representing the consumer group.
1189    /// * `strategy_name` - A reference to a `CheetahString` representing the allocation strategy
1190    ///   name.
1191    /// * `message_model` - The message model to use for the query.
1192    /// * `timeout` - The timeout duration for the query.
1193    ///
1194    /// # Returns
1195    ///
1196    /// A `Result` containing an `Option` with a `HashSet` of `MessageQueueAssignment` if the query
1197    /// is successful, or an error if it fails.
1198    pub async fn query_assignment(
1199        &mut self,
1200        topic: &CheetahString,
1201        consumer_group: &CheetahString,
1202        strategy_name: &CheetahString,
1203        message_model: MessageModel,
1204        timeout: u64,
1205    ) -> rocketmq_error::RocketMQResult<Option<HashSet<MessageQueueAssignment>>> {
1206        // Try to find broker address
1207        let mut broker_addr = self.find_broker_addr_by_topic(topic).await;
1208
1209        // If not found, update and retry
1210        if broker_addr.is_none() {
1211            self.update_topic_route_info_from_name_server_topic(topic)
1212                .await;
1213            broker_addr = self.find_broker_addr_by_topic(topic).await;
1214        }
1215        if let Some(broker_addr) = broker_addr {
1216            let client_id = self.client_id.clone();
1217            match self.mq_client_api_impl.as_mut() {
1218                Some(api_impl) => {
1219                    api_impl
1220                        .query_assignment(
1221                            &broker_addr,
1222                            topic.clone(),
1223                            consumer_group.clone(),
1224                            client_id,
1225                            strategy_name.clone(),
1226                            message_model,
1227                            timeout,
1228                        )
1229                        .await
1230                }
1231                None => Err(mq_client_err!("mq_client_api_impl is None")),
1232            }
1233        } else {
1234            Ok(None)
1235        }
1236    }
1237
1238    pub async fn consume_message_directly(
1239        &self,
1240        message: MessageExt,
1241        consumer_group: &CheetahString,
1242        broker_name: Option<CheetahString>,
1243    ) -> Option<ConsumeMessageDirectlyResult> {
1244        let consumer_table = self.consumer_table.read().await;
1245        let consumer_inner = consumer_table.get(consumer_group);
1246        if let Some(consumer) = consumer_inner {
1247            consumer
1248                .consume_message_directly(message, broker_name)
1249                .await;
1250        }
1251
1252        None
1253    }
1254}
1255
1256#[allow(clippy::unnecessary_unwrap)]
1257pub fn topic_route_data2topic_publish_info(
1258    topic: &str,
1259    route: &mut TopicRouteData,
1260) -> TopicPublishInfo {
1261    let mut info = TopicPublishInfo {
1262        topic_route_data: Some(route.clone()),
1263        ..Default::default()
1264    };
1265    if route.order_topic_conf.is_some() && !route.order_topic_conf.as_ref().unwrap().is_empty() {
1266        let brokers = route
1267            .order_topic_conf
1268            .as_ref()
1269            .unwrap()
1270            .split(";")
1271            .map(|s| s.to_string())
1272            .collect::<Vec<String>>();
1273        for broker in brokers {
1274            let item = broker.split(":").collect::<Vec<&str>>();
1275            if item.len() == 2 {
1276                let queue_num = item[1].parse::<i32>().unwrap();
1277                for i in 0..queue_num {
1278                    let mq = MessageQueue::from_parts(topic, item[0], i);
1279                    info.message_queue_list.push(mq);
1280                }
1281            }
1282        }
1283        info.order_topic = true;
1284    } else if route.order_topic_conf.is_none()
1285        && route.topic_queue_mapping_by_broker.is_some()
1286        && !route
1287            .topic_queue_mapping_by_broker
1288            .as_ref()
1289            .unwrap()
1290            .is_empty()
1291    {
1292        info.order_topic = false;
1293        let mq_end_points =
1294            ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1295        if let Some(mq_end_points) = mq_end_points {
1296            for (mq, broker_name) in mq_end_points {
1297                info.message_queue_list.push(mq);
1298            }
1299        }
1300        info.message_queue_list
1301            .sort_by(|a, b| match a.get_queue_id().cmp(&b.get_queue_id()) {
1302                Ordering::Less => std::cmp::Ordering::Less,
1303                Ordering::Equal => std::cmp::Ordering::Equal,
1304                Ordering::Greater => std::cmp::Ordering::Greater,
1305            });
1306    } else {
1307        route.queue_datas.sort();
1308        for queue_data in route.queue_datas.iter() {
1309            if PermName::is_writeable(queue_data.perm) {
1310                let mut broker_data = None;
1311                for bd in route.broker_datas.iter() {
1312                    if bd.broker_name() == queue_data.broker_name.as_str() {
1313                        broker_data = Some(bd.clone());
1314                        break;
1315                    }
1316                }
1317                if broker_data.is_none() {
1318                    continue;
1319                }
1320                if !broker_data
1321                    .as_ref()
1322                    .unwrap()
1323                    .broker_addrs()
1324                    .contains_key(&(mix_all::MASTER_ID))
1325                {
1326                    continue;
1327                }
1328                for i in 0..queue_data.write_queue_nums {
1329                    let mq =
1330                        MessageQueue::from_parts(topic, queue_data.broker_name.as_str(), i as i32);
1331                    info.message_queue_list.push(mq);
1332                }
1333            }
1334        }
1335    }
1336    info
1337}
1338
1339pub fn topic_route_data2topic_subscribe_info(
1340    topic: &str,
1341    route: &TopicRouteData,
1342) -> HashSet<MessageQueue> {
1343    if let Some(ref topic_queue_mapping_by_broker) = route.topic_queue_mapping_by_broker {
1344        if !topic_queue_mapping_by_broker.is_empty() {
1345            let mq_endpoints =
1346                ClientMetadata::topic_route_data2endpoints_for_static_topic(topic, route);
1347            return mq_endpoints
1348                .unwrap_or_default()
1349                .keys()
1350                .cloned()
1351                .collect::<HashSet<MessageQueue>>();
1352        }
1353    }
1354    let mut mq_list = HashSet::new();
1355    for qd in &route.queue_datas {
1356        if PermName::is_readable(qd.perm) {
1357            for i in 0..qd.read_queue_nums {
1358                let mq = MessageQueue::from_parts(topic, qd.broker_name.as_str(), i as i32);
1359                mq_list.insert(mq);
1360            }
1361        }
1362    }
1363    mq_list
1364}