rocketmq_client_rust/admin/
default_mq_admin_ext_impl.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#![allow(dead_code)]
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::env;
22use std::sync::Arc;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26use lazy_static::lazy_static;
27use rocketmq_common::common::base::plain_access_config::PlainAccessConfig;
28use rocketmq_common::common::base::service_state::ServiceState;
29use rocketmq_common::common::config::TopicConfig;
30use rocketmq_common::common::message::message_enum::MessageRequestMode;
31use rocketmq_common::common::message::message_queue::MessageQueue;
32use rocketmq_common::common::mix_all;
33use rocketmq_common::common::FAQUrl;
34use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
35use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable;
36use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
37use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
38use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
39use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
40use rocketmq_remoting::protocol::body::group_list::GroupList;
41use rocketmq_remoting::protocol::body::kv_table::KVTable;
42use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
43use rocketmq_remoting::protocol::body::topic::topic_list::TopicList;
44use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
45use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
46use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
47use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
48use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
49use rocketmq_remoting::runtime::RPCHook;
50use rocketmq_rust::ArcMut;
51use tracing::info;
52
53use crate::admin::mq_admin_ext_async::MQAdminExt;
54use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
55use crate::base::client_config::ClientConfig;
56use crate::common::admin_tool_result::AdminToolResult;
57use crate::factory::mq_client_instance::MQClientInstance;
58use crate::implementation::mq_client_manager::MQClientManager;
59
60lazy_static! {
61    static ref SYSTEM_GROUP_SET: HashSet<CheetahString> = {
62        let mut set = HashSet::new();
63        set.insert(CheetahString::from(mix_all::DEFAULT_CONSUMER_GROUP));
64        set.insert(CheetahString::from(mix_all::DEFAULT_PRODUCER_GROUP));
65        set.insert(CheetahString::from(mix_all::TOOLS_CONSUMER_GROUP));
66        set.insert(CheetahString::from(mix_all::SCHEDULE_CONSUMER_GROUP));
67        set.insert(CheetahString::from(mix_all::FILTERSRV_CONSUMER_GROUP));
68        set.insert(CheetahString::from(mix_all::MONITOR_CONSUMER_GROUP));
69        set.insert(CheetahString::from(mix_all::CLIENT_INNER_PRODUCER_GROUP));
70        set.insert(CheetahString::from(mix_all::SELF_TEST_PRODUCER_GROUP));
71        set.insert(CheetahString::from(mix_all::SELF_TEST_CONSUMER_GROUP));
72        set.insert(CheetahString::from(mix_all::ONS_HTTP_PROXY_GROUP));
73        set.insert(CheetahString::from(mix_all::CID_ONSAPI_PERMISSION_GROUP));
74        set.insert(CheetahString::from(mix_all::CID_ONSAPI_OWNER_GROUP));
75        set.insert(CheetahString::from(mix_all::CID_ONSAPI_PULL_GROUP));
76        set.insert(CheetahString::from(mix_all::CID_SYS_RMQ_TRANS));
77        set
78    };
79}
80
81const SOCKS_PROXY_JSON: &str = "socksProxyJson";
82const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
83pub struct DefaultMQAdminExtImpl {
84    service_state: ServiceState,
85    client_instance: Option<ArcMut<MQClientInstance>>,
86    rpc_hook: Option<Arc<dyn RPCHook>>,
87    timeout_millis: Duration,
88    kv_namespace_to_delete_list: Vec<CheetahString>,
89    client_config: ArcMut<ClientConfig>,
90    admin_ext_group: CheetahString,
91    inner: Option<ArcMut<DefaultMQAdminExtImpl>>,
92}
93
94impl DefaultMQAdminExtImpl {
95    pub fn new(
96        rpc_hook: Option<Arc<dyn RPCHook>>,
97        timeout_millis: Duration,
98        client_config: ArcMut<ClientConfig>,
99        admin_ext_group: CheetahString,
100    ) -> Self {
101        DefaultMQAdminExtImpl {
102            service_state: ServiceState::CreateJust,
103            client_instance: None,
104            rpc_hook,
105            timeout_millis,
106            kv_namespace_to_delete_list: vec![CheetahString::from_static_str(
107                NAMESPACE_ORDER_TOPIC_CONFIG,
108            )],
109            client_config,
110            admin_ext_group,
111            inner: None,
112        }
113    }
114
115    pub fn set_inner(&mut self, inner: ArcMut<DefaultMQAdminExtImpl>) {
116        self.inner = Some(inner);
117    }
118}
119
120#[allow(unused_variables)]
121#[allow(unused_mut)]
122impl MQAdminExt for DefaultMQAdminExtImpl {
123    async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
124        match self.service_state {
125            ServiceState::CreateJust => {
126                self.service_state = ServiceState::StartFailed;
127                self.client_config.change_instance_name_to_pid();
128                if "{}".eq(&self.client_config.socks_proxy_config) {
129                    self.client_config.socks_proxy_config = env::var(SOCKS_PROXY_JSON)
130                        .unwrap_or_else(|_| "{}".to_string())
131                        .into();
132                }
133                self.client_instance = Some(
134                    MQClientManager::get_instance().get_or_create_mq_client_instance(
135                        self.client_config.as_ref().clone(),
136                        self.rpc_hook.clone(),
137                    ),
138                );
139
140                let group = &self.admin_ext_group.clone();
141                let register_ok = self
142                    .client_instance
143                    .as_mut()
144                    .unwrap()
145                    .register_admin_ext(
146                        group,
147                        MQAdminExtInnerImpl {
148                            inner: self.inner.as_ref().unwrap().clone(),
149                        },
150                    )
151                    .await;
152                if !register_ok {
153                    self.service_state = ServiceState::StartFailed;
154                    return Err(rocketmq_error::RocketMQError::illegal_argument(format!(
155                        "The adminExt group[{}] has created already, specified another name \
156                         please.{}",
157                        self.admin_ext_group,
158                        FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL)
159                    )));
160                }
161                let arc_mut = self.client_instance.clone().unwrap();
162                self.client_instance
163                    .as_mut()
164                    .unwrap()
165                    .start(arc_mut)
166                    .await?;
167                self.service_state = ServiceState::Running;
168                info!("the adminExt [{}] start OK", self.admin_ext_group);
169                Ok(())
170            }
171            ServiceState::Running | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
172                unimplemented!()
173            }
174        }
175    }
176
177    async fn shutdown(&mut self) {
178        match self.service_state {
179            ServiceState::CreateJust
180            | ServiceState::ShutdownAlready
181            | ServiceState::StartFailed => {
182                // do nothing
183            }
184            ServiceState::Running => {
185                let instance = self.client_instance.as_mut().unwrap();
186                instance.unregister_admin_ext(&self.admin_ext_group).await;
187                instance.shutdown().await;
188                self.service_state = ServiceState::ShutdownAlready;
189            }
190        }
191    }
192
193    async fn add_broker_to_container(
194        &self,
195        broker_container_addr: CheetahString,
196        broker_config: CheetahString,
197    ) -> rocketmq_error::RocketMQResult<()> {
198        todo!()
199    }
200
201    async fn remove_broker_from_container(
202        &self,
203        broker_container_addr: CheetahString,
204        cluster_name: CheetahString,
205        broker_name: CheetahString,
206        broker_id: u64,
207    ) -> rocketmq_error::RocketMQResult<()> {
208        todo!()
209    }
210
211    async fn update_broker_config(
212        &self,
213        broker_addr: CheetahString,
214        properties: HashMap<CheetahString, CheetahString>,
215    ) -> rocketmq_error::RocketMQResult<()> {
216        todo!()
217    }
218
219    async fn get_broker_config(
220        &self,
221        broker_addr: CheetahString,
222    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, CheetahString>> {
223        todo!()
224    }
225
226    async fn create_and_update_topic_config(
227        &self,
228        addr: CheetahString,
229        config: TopicConfig,
230    ) -> rocketmq_error::RocketMQResult<()> {
231        todo!()
232    }
233
234    async fn create_and_update_topic_config_list(
235        &self,
236        addr: CheetahString,
237        topic_config_list: Vec<TopicConfig>,
238    ) -> rocketmq_error::RocketMQResult<()> {
239        todo!()
240    }
241
242    async fn create_and_update_plain_access_config(
243        &self,
244        addr: CheetahString,
245        config: PlainAccessConfig,
246    ) -> rocketmq_error::RocketMQResult<()> {
247        todo!()
248    }
249
250    async fn delete_plain_access_config(
251        &self,
252        addr: CheetahString,
253        access_key: CheetahString,
254    ) -> rocketmq_error::RocketMQResult<()> {
255        todo!()
256    }
257
258    async fn update_global_white_addr_config(
259        &self,
260        addr: CheetahString,
261        global_white_addrs: CheetahString,
262        acl_file_full_path: Option<CheetahString>,
263    ) -> rocketmq_error::RocketMQResult<()> {
264        todo!()
265    }
266
267    async fn examine_broker_cluster_acl_version_info(
268        &self,
269        addr: CheetahString,
270    ) -> rocketmq_error::RocketMQResult<CheetahString> {
271        todo!()
272    }
273
274    async fn create_and_update_subscription_group_config(
275        &self,
276        addr: CheetahString,
277        config: SubscriptionGroupConfig,
278    ) -> rocketmq_error::RocketMQResult<()> {
279        todo!()
280    }
281
282    async fn create_and_update_subscription_group_config_list(
283        &self,
284        broker_addr: CheetahString,
285        configs: Vec<SubscriptionGroupConfig>,
286    ) -> rocketmq_error::RocketMQResult<()> {
287        todo!()
288    }
289
290    async fn examine_subscription_group_config(
291        &self,
292        addr: CheetahString,
293        group: CheetahString,
294    ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
295        todo!()
296    }
297
298    async fn examine_topic_stats(
299        &self,
300        topic: CheetahString,
301        broker_addr: Option<CheetahString>,
302    ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
303        todo!()
304    }
305
306    async fn examine_topic_stats_concurrent(
307        &self,
308        topic: CheetahString,
309    ) -> AdminToolResult<TopicStatsTable> {
310        todo!()
311    }
312
313    async fn fetch_all_topic_list(&self) -> rocketmq_error::RocketMQResult<TopicList> {
314        todo!()
315    }
316
317    async fn fetch_topics_by_cluster(
318        &self,
319        cluster_name: CheetahString,
320    ) -> rocketmq_error::RocketMQResult<TopicList> {
321        todo!()
322    }
323
324    async fn fetch_broker_runtime_stats(
325        &self,
326        broker_addr: CheetahString,
327    ) -> rocketmq_error::RocketMQResult<KVTable> {
328        todo!()
329    }
330
331    async fn examine_consume_stats(
332        &self,
333        consumer_group: CheetahString,
334        topic: Option<CheetahString>,
335        cluster_name: Option<CheetahString>,
336        broker_addr: Option<CheetahString>,
337        timeout_millis: Option<u64>,
338    ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
339        todo!()
340    }
341
342    async fn examine_broker_cluster_info(&self) -> rocketmq_error::RocketMQResult<ClusterInfo> {
343        self.client_instance
344            .as_ref()
345            .unwrap()
346            .get_mq_client_api_impl()
347            .get_broker_cluster_info(self.timeout_millis.as_millis() as u64)
348            .await
349    }
350
351    async fn examine_topic_route_info(
352        &self,
353        topic: CheetahString,
354    ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
355        self.client_instance
356            .as_ref()
357            .unwrap()
358            .mq_client_api_impl
359            .as_ref()
360            .unwrap()
361            .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
362            .await
363    }
364
365    async fn examine_consumer_connection_info(
366        &self,
367        consumer_group: CheetahString,
368        broker_addr: Option<CheetahString>,
369    ) -> rocketmq_error::RocketMQResult<ConsumerConnection> {
370        todo!()
371    }
372
373    async fn examine_producer_connection_info(
374        &self,
375        producer_group: CheetahString,
376        topic: CheetahString,
377    ) -> rocketmq_error::RocketMQResult<ProducerConnection> {
378        todo!()
379    }
380
381    async fn get_name_server_address_list(&self) -> Vec<CheetahString> {
382        self.client_instance
383            .as_ref()
384            .unwrap()
385            .get_mq_client_api_impl()
386            .get_name_server_address_list()
387            .to_vec()
388    }
389
390    async fn wipe_write_perm_of_broker(
391        &self,
392        namesrv_addr: CheetahString,
393        broker_name: CheetahString,
394    ) -> rocketmq_error::RocketMQResult<i32> {
395        self.client_instance
396            .as_ref()
397            .unwrap()
398            .get_mq_client_api_impl()
399            .wipe_write_perm_of_broker(
400                namesrv_addr,
401                broker_name,
402                self.timeout_millis.as_millis() as u64,
403            )
404            .await
405    }
406
407    async fn add_write_perm_of_broker(
408        &self,
409        namesrv_addr: CheetahString,
410        broker_name: CheetahString,
411    ) -> rocketmq_error::RocketMQResult<i32> {
412        self.client_instance
413            .as_ref()
414            .unwrap()
415            .get_mq_client_api_impl()
416            .add_write_perm_of_broker(
417                namesrv_addr,
418                broker_name,
419                self.timeout_millis.as_millis() as u64,
420            )
421            .await
422    }
423
424    async fn put_kv_config(
425        &self,
426        namespace: CheetahString,
427        key: CheetahString,
428        value: CheetahString,
429    ) {
430        todo!()
431    }
432
433    async fn get_kv_config(
434        &self,
435        namespace: CheetahString,
436        key: CheetahString,
437    ) -> rocketmq_error::RocketMQResult<CheetahString> {
438        todo!()
439    }
440
441    async fn get_kv_list_by_namespace(
442        &self,
443        namespace: CheetahString,
444    ) -> rocketmq_error::RocketMQResult<KVTable> {
445        todo!()
446    }
447
448    async fn delete_topic(
449        &self,
450        topic_name: CheetahString,
451        cluster_name: CheetahString,
452    ) -> rocketmq_error::RocketMQResult<()> {
453        todo!()
454    }
455
456    async fn delete_topic_in_broker(
457        &self,
458        addrs: HashSet<CheetahString>,
459        topic: CheetahString,
460    ) -> rocketmq_error::RocketMQResult<()> {
461        todo!()
462    }
463
464    async fn delete_topic_in_name_server(
465        &self,
466        addrs: HashSet<CheetahString>,
467        cluster_name: Option<CheetahString>,
468        topic: CheetahString,
469    ) -> rocketmq_error::RocketMQResult<()> {
470        todo!()
471    }
472
473    async fn delete_subscription_group(
474        &self,
475        addr: CheetahString,
476        group_name: CheetahString,
477        remove_offset: Option<bool>,
478    ) -> rocketmq_error::RocketMQResult<()> {
479        todo!()
480    }
481
482    async fn create_and_update_kv_config(
483        &self,
484        namespace: CheetahString,
485        key: CheetahString,
486        value: CheetahString,
487    ) -> rocketmq_error::RocketMQResult<()> {
488        self.client_instance
489            .as_ref()
490            .unwrap()
491            .get_mq_client_api_impl()
492            .put_kvconfig_value(
493                namespace,
494                key,
495                value,
496                self.timeout_millis.as_millis() as u64,
497            )
498            .await
499    }
500
501    async fn delete_kv_config(
502        &self,
503        namespace: CheetahString,
504        key: CheetahString,
505    ) -> rocketmq_error::RocketMQResult<()> {
506        self.client_instance
507            .as_ref()
508            .unwrap()
509            .get_mq_client_api_impl()
510            .delete_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
511            .await
512    }
513
514    async fn reset_offset_by_timestamp(
515        &self,
516        cluster_name: Option<CheetahString>,
517        topic: CheetahString,
518        group: CheetahString,
519        timestamp: u64,
520        is_force: bool,
521    ) -> rocketmq_error::RocketMQResult<HashMap<MessageQueue, u64>> {
522        todo!()
523    }
524
525    async fn reset_offset_new(
526        &self,
527        consumer_group: CheetahString,
528        topic: CheetahString,
529        timestamp: u64,
530    ) -> rocketmq_error::RocketMQResult<()> {
531        todo!()
532    }
533
534    async fn get_consume_status(
535        &self,
536        topic: CheetahString,
537        group: CheetahString,
538        client_addr: CheetahString,
539    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<MessageQueue, u64>>> {
540        todo!()
541    }
542
543    async fn create_or_update_order_conf(
544        &self,
545        key: CheetahString,
546        value: CheetahString,
547        is_cluster: bool,
548    ) -> rocketmq_error::RocketMQResult<()> {
549        todo!()
550    }
551
552    async fn query_topic_consume_by_who(
553        &self,
554        topic: CheetahString,
555    ) -> rocketmq_error::RocketMQResult<GroupList> {
556        todo!()
557    }
558
559    async fn query_topics_by_consumer(
560        &self,
561        group: CheetahString,
562    ) -> rocketmq_error::RocketMQResult<TopicList> {
563        todo!()
564    }
565
566    async fn query_topics_by_consumer_concurrent(
567        &self,
568        group: CheetahString,
569    ) -> AdminToolResult<TopicList> {
570        todo!()
571    }
572
573    async fn query_subscription(
574        &self,
575        group: CheetahString,
576        topic: CheetahString,
577    ) -> rocketmq_error::RocketMQResult<SubscriptionData> {
578        todo!()
579    }
580
581    async fn clean_expired_consumer_queue(
582        &self,
583        cluster: Option<CheetahString>,
584        addr: Option<CheetahString>,
585    ) -> rocketmq_error::RocketMQResult<bool> {
586        todo!()
587    }
588
589    async fn delete_expired_commit_log(
590        &self,
591        cluster: Option<CheetahString>,
592        addr: Option<CheetahString>,
593    ) -> rocketmq_error::RocketMQResult<bool> {
594        todo!()
595    }
596
597    async fn clean_unused_topic(
598        &self,
599        cluster: Option<CheetahString>,
600        addr: Option<CheetahString>,
601    ) -> rocketmq_error::RocketMQResult<bool> {
602        todo!()
603    }
604
605    async fn get_consumer_running_info(
606        &self,
607        consumer_group: CheetahString,
608        client_id: CheetahString,
609        jstack: bool,
610        metrics: Option<bool>,
611    ) -> rocketmq_error::RocketMQResult<ConsumerRunningInfo> {
612        todo!()
613    }
614
615    async fn consume_message_directly(
616        &self,
617        consumer_group: CheetahString,
618        client_id: CheetahString,
619        topic: CheetahString,
620        msg_id: CheetahString,
621    ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
622        todo!()
623    }
624
625    async fn consume_message_directly_ext(
626        &self,
627        cluster_name: CheetahString,
628        consumer_group: CheetahString,
629        client_id: CheetahString,
630        topic: CheetahString,
631        msg_id: CheetahString,
632    ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
633        todo!()
634    }
635
636    async fn clone_group_offset(
637        &self,
638        src_group: CheetahString,
639        dest_group: CheetahString,
640        topic: CheetahString,
641        is_offline: bool,
642    ) -> rocketmq_error::RocketMQResult<()> {
643        todo!()
644    }
645
646    async fn get_cluster_list(
647        &self,
648        topic: String,
649    ) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
650        todo!()
651    }
652
653    async fn get_topic_cluster_list(
654        &self,
655        topic: String,
656    ) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
657        let cluster_info = self.examine_broker_cluster_info().await?;
658        let topic_route_data = self.examine_topic_route_info(topic.into()).await?.unwrap();
659        let broker_data = topic_route_data
660            .broker_datas
661            .first()
662            .ok_or_else(|| mq_client_err!("Broker datas is empty"))?;
663        let mut cluster_set = HashSet::new();
664        let broker_name = broker_data.broker_name();
665        if let Some(cluster_addr_table) = cluster_info.cluster_addr_table.as_ref() {
666            cluster_set.extend(
667                cluster_addr_table
668                    .iter()
669                    .filter(|(cluster_name, broker_names)| broker_names.contains(broker_name))
670                    .map(|(cluster_name, broker_names)| cluster_name.clone()),
671            );
672        }
673        Ok(cluster_set)
674    }
675
676    async fn get_all_topic_config(
677        &self,
678        broker_addr: CheetahString,
679        timeout_millis: u64,
680    ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
681        todo!()
682    }
683
684    async fn get_user_topic_config(
685        &self,
686        broker_addr: CheetahString,
687        special_topic: bool,
688        timeout_millis: u64,
689    ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
690        todo!()
691    }
692
693    async fn update_consume_offset(
694        &self,
695        broker_addr: CheetahString,
696        consume_group: CheetahString,
697        mq: MessageQueue,
698        offset: u64,
699    ) -> rocketmq_error::RocketMQResult<()> {
700        todo!()
701    }
702
703    async fn update_name_server_config(
704        &self,
705        properties: HashMap<CheetahString, CheetahString>,
706        name_servers: Option<Vec<CheetahString>>,
707    ) -> rocketmq_error::RocketMQResult<()> {
708        self.client_instance
709            .as_ref()
710            .unwrap()
711            .get_mq_client_api_impl()
712            .update_name_server_config(
713                properties,
714                name_servers,
715                self.timeout_millis.as_millis() as u64,
716            )
717            .await
718    }
719
720    async fn get_name_server_config(
721        &self,
722        name_servers: Vec<CheetahString>,
723    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
724    {
725        Ok(self
726            .client_instance
727            .as_ref()
728            .unwrap()
729            .mq_client_api_impl
730            .as_ref()
731            .unwrap()
732            .get_name_server_config(Some(name_servers), self.timeout_millis)
733            .await?
734            .unwrap_or_default())
735    }
736
737    async fn resume_check_half_message(
738        &self,
739        topic: CheetahString,
740        msg_id: CheetahString,
741    ) -> rocketmq_error::RocketMQResult<bool> {
742        todo!()
743    }
744
745    async fn set_message_request_mode(
746        &self,
747        broker_addr: CheetahString,
748        topic: CheetahString,
749        consumer_group: CheetahString,
750        mode: MessageRequestMode,
751        pop_work_group_size: i32,
752        timeout_millis: u64,
753    ) -> rocketmq_error::RocketMQResult<()> {
754        let mut mq_client_api = self
755            .client_instance
756            .as_ref()
757            .unwrap()
758            .get_mq_client_api_impl();
759        match mq_client_api
760            .set_message_request_mode(
761                &broker_addr,
762                &topic,
763                &consumer_group,
764                mode,
765                pop_work_group_size,
766                timeout_millis,
767            )
768            .await
769        {
770            Ok(_) => Ok(()),
771            Err(e) => Err(e),
772        }
773    }
774
775    async fn reset_offset_by_queue_id(
776        &self,
777        broker_addr: CheetahString,
778        consumer_group: CheetahString,
779        topic_name: CheetahString,
780        queue_id: i32,
781        reset_offset: u64,
782    ) -> rocketmq_error::RocketMQResult<()> {
783        todo!()
784    }
785
786    async fn examine_topic_config(
787        &self,
788        addr: CheetahString,
789        topic: CheetahString,
790    ) -> rocketmq_error::RocketMQResult<TopicConfig> {
791        todo!()
792    }
793
794    async fn create_static_topic(
795        &self,
796        addr: CheetahString,
797        default_topic: CheetahString,
798        topic_config: TopicConfig,
799        mapping_detail: TopicQueueMappingDetail,
800        force: bool,
801    ) -> rocketmq_error::RocketMQResult<()> {
802        todo!()
803    }
804
805    async fn reset_master_flush_offset(
806        &self,
807        broker_addr: CheetahString,
808        master_flush_offset: u64,
809    ) -> rocketmq_error::RocketMQResult<()> {
810        todo!()
811    }
812
813    async fn get_controller_config(
814        &self,
815        controller_servers: Vec<CheetahString>,
816    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
817    {
818        todo!()
819    }
820
821    async fn update_controller_config(
822        &self,
823        properties: HashMap<CheetahString, CheetahString>,
824        controllers: Vec<CheetahString>,
825    ) -> rocketmq_error::RocketMQResult<()> {
826        todo!()
827    }
828
829    async fn clean_controller_broker_data(
830        &self,
831        controller_addr: CheetahString,
832        cluster_name: CheetahString,
833        broker_name: CheetahString,
834        broker_controller_ids_to_clean: Option<CheetahString>,
835        is_clean_living_broker: bool,
836    ) -> rocketmq_error::RocketMQResult<()> {
837        todo!()
838    }
839
840    async fn update_cold_data_flow_ctr_group_config(
841        &self,
842        broker_addr: CheetahString,
843        properties: HashMap<CheetahString, CheetahString>,
844    ) -> rocketmq_error::RocketMQResult<()> {
845        todo!()
846    }
847
848    async fn remove_cold_data_flow_ctr_group_config(
849        &self,
850        broker_addr: CheetahString,
851        consumer_group: CheetahString,
852    ) -> rocketmq_error::RocketMQResult<()> {
853        todo!()
854    }
855
856    async fn get_cold_data_flow_ctr_info(
857        &self,
858        broker_addr: CheetahString,
859    ) -> rocketmq_error::RocketMQResult<CheetahString> {
860        todo!()
861    }
862
863    async fn set_commit_log_read_ahead_mode(
864        &self,
865        broker_addr: CheetahString,
866        mode: CheetahString,
867    ) -> rocketmq_error::RocketMQResult<CheetahString> {
868        todo!()
869    }
870
871    async fn create_user(
872        &self,
873        broker_addr: CheetahString,
874        username: CheetahString,
875        password: CheetahString,
876        user_type: CheetahString,
877    ) -> rocketmq_error::RocketMQResult<()> {
878        todo!()
879    }
880
881    async fn update_user(
882        &self,
883        broker_addr: CheetahString,
884        username: CheetahString,
885        password: CheetahString,
886        user_type: CheetahString,
887        user_status: CheetahString,
888    ) -> rocketmq_error::RocketMQResult<()> {
889        todo!()
890    }
891
892    async fn delete_user(
893        &self,
894        broker_addr: CheetahString,
895        username: CheetahString,
896    ) -> rocketmq_error::RocketMQResult<()> {
897        todo!()
898    }
899
900    async fn create_acl(
901        &self,
902        broker_addr: CheetahString,
903        subject: CheetahString,
904        resources: Vec<CheetahString>,
905        actions: Vec<CheetahString>,
906        source_ips: Vec<CheetahString>,
907        decision: CheetahString,
908    ) -> rocketmq_error::RocketMQResult<()> {
909        todo!()
910    }
911
912    async fn update_acl(
913        &self,
914        broker_addr: CheetahString,
915        subject: CheetahString,
916        resources: Vec<CheetahString>,
917        actions: Vec<CheetahString>,
918        source_ips: Vec<CheetahString>,
919        decision: CheetahString,
920    ) -> rocketmq_error::RocketMQResult<()> {
921        todo!()
922    }
923
924    async fn delete_acl(
925        &self,
926        broker_addr: CheetahString,
927        subject: CheetahString,
928        resource: CheetahString,
929    ) -> rocketmq_error::RocketMQResult<()> {
930        todo!()
931    }
932}