rocketmq_client_rust/admin/
default_mq_admin_ext_impl.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#![allow(dead_code)]
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::env;
22use std::sync::Arc;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26use lazy_static::lazy_static;
27use rocketmq_common::common::base::plain_access_config::PlainAccessConfig;
28use rocketmq_common::common::base::service_state::ServiceState;
29use rocketmq_common::common::config::TopicConfig;
30use rocketmq_common::common::message::message_enum::MessageRequestMode;
31use rocketmq_common::common::message::message_queue::MessageQueue;
32use rocketmq_common::common::mix_all;
33use rocketmq_common::common::FAQUrl;
34use rocketmq_error::ClientErr;
35use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
36use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable;
37use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
38use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
39use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
40use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
41use rocketmq_remoting::protocol::body::group_list::GroupList;
42use rocketmq_remoting::protocol::body::kv_table::KVTable;
43use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
44use rocketmq_remoting::protocol::body::topic::topic_list::TopicList;
45use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
46use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
47use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
48use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
49use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
50use rocketmq_remoting::runtime::RPCHook;
51use rocketmq_rust::ArcMut;
52use tracing::info;
53
54use crate::admin::mq_admin_ext_async::MQAdminExt;
55use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
56use crate::base::client_config::ClientConfig;
57use crate::common::admin_tool_result::AdminToolResult;
58use crate::factory::mq_client_instance::MQClientInstance;
59use crate::implementation::mq_client_manager::MQClientManager;
60
61lazy_static! {
62    static ref SYSTEM_GROUP_SET: HashSet<CheetahString> = {
63        let mut set = HashSet::new();
64        set.insert(CheetahString::from(mix_all::DEFAULT_CONSUMER_GROUP));
65        set.insert(CheetahString::from(mix_all::DEFAULT_PRODUCER_GROUP));
66        set.insert(CheetahString::from(mix_all::TOOLS_CONSUMER_GROUP));
67        set.insert(CheetahString::from(mix_all::SCHEDULE_CONSUMER_GROUP));
68        set.insert(CheetahString::from(mix_all::FILTERSRV_CONSUMER_GROUP));
69        set.insert(CheetahString::from(mix_all::MONITOR_CONSUMER_GROUP));
70        set.insert(CheetahString::from(mix_all::CLIENT_INNER_PRODUCER_GROUP));
71        set.insert(CheetahString::from(mix_all::SELF_TEST_PRODUCER_GROUP));
72        set.insert(CheetahString::from(mix_all::SELF_TEST_CONSUMER_GROUP));
73        set.insert(CheetahString::from(mix_all::ONS_HTTP_PROXY_GROUP));
74        set.insert(CheetahString::from(mix_all::CID_ONSAPI_PERMISSION_GROUP));
75        set.insert(CheetahString::from(mix_all::CID_ONSAPI_OWNER_GROUP));
76        set.insert(CheetahString::from(mix_all::CID_ONSAPI_PULL_GROUP));
77        set.insert(CheetahString::from(mix_all::CID_SYS_RMQ_TRANS));
78        set
79    };
80}
81
82const SOCKS_PROXY_JSON: &str = "socksProxyJson";
83const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
84pub struct DefaultMQAdminExtImpl {
85    service_state: ServiceState,
86    client_instance: Option<ArcMut<MQClientInstance>>,
87    rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
88    timeout_millis: Duration,
89    kv_namespace_to_delete_list: Vec<CheetahString>,
90    client_config: ArcMut<ClientConfig>,
91    admin_ext_group: CheetahString,
92    inner: Option<ArcMut<DefaultMQAdminExtImpl>>,
93}
94
95impl DefaultMQAdminExtImpl {
96    pub fn new(
97        rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
98        timeout_millis: Duration,
99        client_config: ArcMut<ClientConfig>,
100        admin_ext_group: CheetahString,
101    ) -> Self {
102        DefaultMQAdminExtImpl {
103            service_state: ServiceState::CreateJust,
104            client_instance: None,
105            rpc_hook,
106            timeout_millis,
107            kv_namespace_to_delete_list: vec![CheetahString::from_static_str(
108                NAMESPACE_ORDER_TOPIC_CONFIG,
109            )],
110            client_config,
111            admin_ext_group,
112            inner: None,
113        }
114    }
115
116    pub fn set_inner(&mut self, inner: ArcMut<DefaultMQAdminExtImpl>) {
117        self.inner = Some(inner);
118    }
119}
120
121#[allow(unused_variables)]
122#[allow(unused_mut)]
123impl MQAdminExt for DefaultMQAdminExtImpl {
124    async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
125        match self.service_state {
126            ServiceState::CreateJust => {
127                self.service_state = ServiceState::StartFailed;
128                self.client_config.change_instance_name_to_pid();
129                if "{}".eq(&self.client_config.socks_proxy_config) {
130                    self.client_config.socks_proxy_config = env::var(SOCKS_PROXY_JSON)
131                        .unwrap_or_else(|_| "{}".to_string())
132                        .into();
133                }
134                self.client_instance = Some(
135                    MQClientManager::get_instance().get_or_create_mq_client_instance(
136                        self.client_config.as_ref().clone(),
137                        self.rpc_hook.clone(),
138                    ),
139                );
140
141                let group = &self.admin_ext_group.clone();
142                let register_ok = self
143                    .client_instance
144                    .as_mut()
145                    .unwrap()
146                    .register_admin_ext(
147                        group,
148                        MQAdminExtInnerImpl {
149                            inner: self.inner.as_ref().unwrap().clone(),
150                        },
151                    )
152                    .await;
153                if !register_ok {
154                    self.service_state = ServiceState::StartFailed;
155                    return Err(rocketmq_error::RocketmqError::MQClientErr(ClientErr::new(
156                        format!(
157                            "The adminExt group[{}] has created already, specified another name \
158                             please.{}",
159                            self.admin_ext_group,
160                            FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL)
161                        ),
162                    )));
163                }
164                let arc_mut = self.client_instance.clone().unwrap();
165                self.client_instance
166                    .as_mut()
167                    .unwrap()
168                    .start(arc_mut)
169                    .await?;
170                self.service_state = ServiceState::Running;
171                info!("the adminExt [{}] start OK", self.admin_ext_group);
172                Ok(())
173            }
174            ServiceState::Running | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
175                unimplemented!()
176            }
177        }
178    }
179
180    async fn shutdown(&mut self) {
181        match self.service_state {
182            ServiceState::CreateJust
183            | ServiceState::ShutdownAlready
184            | ServiceState::StartFailed => {
185                // do nothing
186            }
187            ServiceState::Running => {
188                let instance = self.client_instance.as_mut().unwrap();
189                instance.unregister_admin_ext(&self.admin_ext_group).await;
190                instance.shutdown().await;
191                self.service_state = ServiceState::ShutdownAlready;
192            }
193        }
194    }
195
196    async fn add_broker_to_container(
197        &self,
198        broker_container_addr: CheetahString,
199        broker_config: CheetahString,
200    ) -> rocketmq_error::RocketMQResult<()> {
201        todo!()
202    }
203
204    async fn remove_broker_from_container(
205        &self,
206        broker_container_addr: CheetahString,
207        cluster_name: CheetahString,
208        broker_name: CheetahString,
209        broker_id: u64,
210    ) -> rocketmq_error::RocketMQResult<()> {
211        todo!()
212    }
213
214    async fn update_broker_config(
215        &self,
216        broker_addr: CheetahString,
217        properties: HashMap<CheetahString, CheetahString>,
218    ) -> rocketmq_error::RocketMQResult<()> {
219        todo!()
220    }
221
222    async fn get_broker_config(
223        &self,
224        broker_addr: CheetahString,
225    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, CheetahString>> {
226        todo!()
227    }
228
229    async fn create_and_update_topic_config(
230        &self,
231        addr: CheetahString,
232        config: TopicConfig,
233    ) -> rocketmq_error::RocketMQResult<()> {
234        todo!()
235    }
236
237    async fn create_and_update_topic_config_list(
238        &self,
239        addr: CheetahString,
240        topic_config_list: Vec<TopicConfig>,
241    ) -> rocketmq_error::RocketMQResult<()> {
242        todo!()
243    }
244
245    async fn create_and_update_plain_access_config(
246        &self,
247        addr: CheetahString,
248        config: PlainAccessConfig,
249    ) -> rocketmq_error::RocketMQResult<()> {
250        todo!()
251    }
252
253    async fn delete_plain_access_config(
254        &self,
255        addr: CheetahString,
256        access_key: CheetahString,
257    ) -> rocketmq_error::RocketMQResult<()> {
258        todo!()
259    }
260
261    async fn update_global_white_addr_config(
262        &self,
263        addr: CheetahString,
264        global_white_addrs: CheetahString,
265        acl_file_full_path: Option<CheetahString>,
266    ) -> rocketmq_error::RocketMQResult<()> {
267        todo!()
268    }
269
270    async fn examine_broker_cluster_acl_version_info(
271        &self,
272        addr: CheetahString,
273    ) -> rocketmq_error::RocketMQResult<CheetahString> {
274        todo!()
275    }
276
277    async fn create_and_update_subscription_group_config(
278        &self,
279        addr: CheetahString,
280        config: SubscriptionGroupConfig,
281    ) -> rocketmq_error::RocketMQResult<()> {
282        todo!()
283    }
284
285    async fn create_and_update_subscription_group_config_list(
286        &self,
287        broker_addr: CheetahString,
288        configs: Vec<SubscriptionGroupConfig>,
289    ) -> rocketmq_error::RocketMQResult<()> {
290        todo!()
291    }
292
293    async fn examine_subscription_group_config(
294        &self,
295        addr: CheetahString,
296        group: CheetahString,
297    ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
298        todo!()
299    }
300
301    async fn examine_topic_stats(
302        &self,
303        topic: CheetahString,
304        broker_addr: Option<CheetahString>,
305    ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
306        todo!()
307    }
308
309    async fn examine_topic_stats_concurrent(
310        &self,
311        topic: CheetahString,
312    ) -> AdminToolResult<TopicStatsTable> {
313        todo!()
314    }
315
316    async fn fetch_all_topic_list(&self) -> rocketmq_error::RocketMQResult<TopicList> {
317        todo!()
318    }
319
320    async fn fetch_topics_by_cluster(
321        &self,
322        cluster_name: CheetahString,
323    ) -> rocketmq_error::RocketMQResult<TopicList> {
324        todo!()
325    }
326
327    async fn fetch_broker_runtime_stats(
328        &self,
329        broker_addr: CheetahString,
330    ) -> rocketmq_error::RocketMQResult<KVTable> {
331        todo!()
332    }
333
334    async fn examine_consume_stats(
335        &self,
336        consumer_group: CheetahString,
337        topic: Option<CheetahString>,
338        cluster_name: Option<CheetahString>,
339        broker_addr: Option<CheetahString>,
340        timeout_millis: Option<u64>,
341    ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
342        todo!()
343    }
344
345    async fn examine_broker_cluster_info(&self) -> rocketmq_error::RocketMQResult<ClusterInfo> {
346        todo!()
347    }
348
349    async fn examine_topic_route_info(
350        &self,
351        topic: CheetahString,
352    ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
353        self.client_instance
354            .as_ref()
355            .unwrap()
356            .mq_client_api_impl
357            .as_ref()
358            .unwrap()
359            .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
360            .await
361    }
362
363    async fn examine_consumer_connection_info(
364        &self,
365        consumer_group: CheetahString,
366        broker_addr: Option<CheetahString>,
367    ) -> rocketmq_error::RocketMQResult<ConsumerConnection> {
368        todo!()
369    }
370
371    async fn examine_producer_connection_info(
372        &self,
373        producer_group: CheetahString,
374        topic: CheetahString,
375    ) -> rocketmq_error::RocketMQResult<ProducerConnection> {
376        todo!()
377    }
378
379    async fn get_name_server_address_list(&self) -> Vec<CheetahString> {
380        self.client_instance
381            .as_ref()
382            .unwrap()
383            .get_mq_client_api_impl()
384            .get_name_server_address_list()
385            .to_vec()
386    }
387
388    async fn wipe_write_perm_of_broker(
389        &self,
390        namesrv_addr: CheetahString,
391        broker_name: CheetahString,
392    ) -> rocketmq_error::RocketMQResult<i32> {
393        self.client_instance
394            .as_ref()
395            .unwrap()
396            .get_mq_client_api_impl()
397            .wipe_write_perm_of_broker(
398                namesrv_addr,
399                broker_name,
400                self.timeout_millis.as_millis() as u64,
401            )
402            .await
403    }
404
405    async fn add_write_perm_of_broker(
406        &self,
407        namesrv_addr: CheetahString,
408        broker_name: CheetahString,
409    ) -> rocketmq_error::RocketMQResult<i32> {
410        self.client_instance
411            .as_ref()
412            .unwrap()
413            .get_mq_client_api_impl()
414            .add_write_perm_of_broker(
415                namesrv_addr,
416                broker_name,
417                self.timeout_millis.as_millis() as u64,
418            )
419            .await
420    }
421
422    async fn put_kv_config(
423        &self,
424        namespace: CheetahString,
425        key: CheetahString,
426        value: CheetahString,
427    ) {
428        todo!()
429    }
430
431    async fn get_kv_config(
432        &self,
433        namespace: CheetahString,
434        key: CheetahString,
435    ) -> rocketmq_error::RocketMQResult<CheetahString> {
436        todo!()
437    }
438
439    async fn get_kv_list_by_namespace(
440        &self,
441        namespace: CheetahString,
442    ) -> rocketmq_error::RocketMQResult<KVTable> {
443        todo!()
444    }
445
446    async fn delete_topic(
447        &self,
448        topic_name: CheetahString,
449        cluster_name: CheetahString,
450    ) -> rocketmq_error::RocketMQResult<()> {
451        todo!()
452    }
453
454    async fn delete_topic_in_broker(
455        &self,
456        addrs: HashSet<CheetahString>,
457        topic: CheetahString,
458    ) -> rocketmq_error::RocketMQResult<()> {
459        todo!()
460    }
461
462    async fn delete_topic_in_name_server(
463        &self,
464        addrs: HashSet<CheetahString>,
465        cluster_name: Option<CheetahString>,
466        topic: CheetahString,
467    ) -> rocketmq_error::RocketMQResult<()> {
468        todo!()
469    }
470
471    async fn delete_subscription_group(
472        &self,
473        addr: CheetahString,
474        group_name: CheetahString,
475        remove_offset: Option<bool>,
476    ) -> rocketmq_error::RocketMQResult<()> {
477        todo!()
478    }
479
480    async fn create_and_update_kv_config(
481        &self,
482        namespace: CheetahString,
483        key: CheetahString,
484        value: CheetahString,
485    ) -> rocketmq_error::RocketMQResult<()> {
486        self.client_instance
487            .as_ref()
488            .unwrap()
489            .get_mq_client_api_impl()
490            .put_kvconfig_value(
491                namespace,
492                key,
493                value,
494                self.timeout_millis.as_millis() as u64,
495            )
496            .await
497    }
498
499    async fn delete_kv_config(
500        &self,
501        namespace: CheetahString,
502        key: CheetahString,
503    ) -> rocketmq_error::RocketMQResult<()> {
504        self.client_instance
505            .as_ref()
506            .unwrap()
507            .get_mq_client_api_impl()
508            .delete_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
509            .await
510    }
511
512    async fn reset_offset_by_timestamp(
513        &self,
514        cluster_name: Option<CheetahString>,
515        topic: CheetahString,
516        group: CheetahString,
517        timestamp: u64,
518        is_force: bool,
519    ) -> rocketmq_error::RocketMQResult<HashMap<MessageQueue, u64>> {
520        todo!()
521    }
522
523    async fn reset_offset_new(
524        &self,
525        consumer_group: CheetahString,
526        topic: CheetahString,
527        timestamp: u64,
528    ) -> rocketmq_error::RocketMQResult<()> {
529        todo!()
530    }
531
532    async fn get_consume_status(
533        &self,
534        topic: CheetahString,
535        group: CheetahString,
536        client_addr: CheetahString,
537    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<MessageQueue, u64>>> {
538        todo!()
539    }
540
541    async fn create_or_update_order_conf(
542        &self,
543        key: CheetahString,
544        value: CheetahString,
545        is_cluster: bool,
546    ) -> rocketmq_error::RocketMQResult<()> {
547        todo!()
548    }
549
550    async fn query_topic_consume_by_who(
551        &self,
552        topic: CheetahString,
553    ) -> rocketmq_error::RocketMQResult<GroupList> {
554        todo!()
555    }
556
557    async fn query_topics_by_consumer(
558        &self,
559        group: CheetahString,
560    ) -> rocketmq_error::RocketMQResult<TopicList> {
561        todo!()
562    }
563
564    async fn query_topics_by_consumer_concurrent(
565        &self,
566        group: CheetahString,
567    ) -> AdminToolResult<TopicList> {
568        todo!()
569    }
570
571    async fn query_subscription(
572        &self,
573        group: CheetahString,
574        topic: CheetahString,
575    ) -> rocketmq_error::RocketMQResult<SubscriptionData> {
576        todo!()
577    }
578
579    async fn clean_expired_consumer_queue(
580        &self,
581        cluster: Option<CheetahString>,
582        addr: Option<CheetahString>,
583    ) -> rocketmq_error::RocketMQResult<bool> {
584        todo!()
585    }
586
587    async fn delete_expired_commit_log(
588        &self,
589        cluster: Option<CheetahString>,
590        addr: Option<CheetahString>,
591    ) -> rocketmq_error::RocketMQResult<bool> {
592        todo!()
593    }
594
595    async fn clean_unused_topic(
596        &self,
597        cluster: Option<CheetahString>,
598        addr: Option<CheetahString>,
599    ) -> rocketmq_error::RocketMQResult<bool> {
600        todo!()
601    }
602
603    async fn get_consumer_running_info(
604        &self,
605        consumer_group: CheetahString,
606        client_id: CheetahString,
607        jstack: bool,
608        metrics: Option<bool>,
609    ) -> rocketmq_error::RocketMQResult<ConsumerRunningInfo> {
610        todo!()
611    }
612
613    async fn consume_message_directly(
614        &self,
615        consumer_group: CheetahString,
616        client_id: CheetahString,
617        topic: CheetahString,
618        msg_id: CheetahString,
619    ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
620        todo!()
621    }
622
623    async fn consume_message_directly_ext(
624        &self,
625        cluster_name: CheetahString,
626        consumer_group: CheetahString,
627        client_id: CheetahString,
628        topic: CheetahString,
629        msg_id: CheetahString,
630    ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
631        todo!()
632    }
633
634    async fn clone_group_offset(
635        &self,
636        src_group: CheetahString,
637        dest_group: CheetahString,
638        topic: CheetahString,
639        is_offline: bool,
640    ) -> rocketmq_error::RocketMQResult<()> {
641        todo!()
642    }
643
644    async fn get_cluster_list(
645        &self,
646        topic: String,
647    ) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
648        todo!()
649    }
650
651    async fn get_topic_cluster_list(
652        &self,
653        topic: String,
654    ) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
655        todo!()
656    }
657
658    async fn get_all_topic_config(
659        &self,
660        broker_addr: CheetahString,
661        timeout_millis: u64,
662    ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
663        todo!()
664    }
665
666    async fn get_user_topic_config(
667        &self,
668        broker_addr: CheetahString,
669        special_topic: bool,
670        timeout_millis: u64,
671    ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
672        todo!()
673    }
674
675    async fn update_consume_offset(
676        &self,
677        broker_addr: CheetahString,
678        consume_group: CheetahString,
679        mq: MessageQueue,
680        offset: u64,
681    ) -> rocketmq_error::RocketMQResult<()> {
682        todo!()
683    }
684
685    async fn update_name_server_config(
686        &self,
687        properties: HashMap<CheetahString, CheetahString>,
688        name_servers: Option<Vec<CheetahString>>,
689    ) -> rocketmq_error::RocketMQResult<()> {
690        self.client_instance
691            .as_ref()
692            .unwrap()
693            .get_mq_client_api_impl()
694            .update_name_server_config(
695                properties,
696                name_servers,
697                self.timeout_millis.as_millis() as u64,
698            )
699            .await
700    }
701
702    async fn get_name_server_config(
703        &self,
704        name_servers: Vec<CheetahString>,
705    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
706    {
707        Ok(self
708            .client_instance
709            .as_ref()
710            .unwrap()
711            .mq_client_api_impl
712            .as_ref()
713            .unwrap()
714            .get_name_server_config(Some(name_servers), self.timeout_millis)
715            .await?
716            .unwrap_or_default())
717    }
718
719    async fn resume_check_half_message(
720        &self,
721        topic: CheetahString,
722        msg_id: CheetahString,
723    ) -> rocketmq_error::RocketMQResult<bool> {
724        todo!()
725    }
726
727    async fn set_message_request_mode(
728        &self,
729        broker_addr: CheetahString,
730        topic: CheetahString,
731        consumer_group: CheetahString,
732        mode: MessageRequestMode,
733        pop_work_group_size: i32,
734        timeout_millis: u64,
735    ) -> rocketmq_error::RocketMQResult<()> {
736        let mut mq_client_api = self
737            .client_instance
738            .as_ref()
739            .unwrap()
740            .get_mq_client_api_impl();
741        match mq_client_api
742            .set_message_request_mode(
743                &broker_addr,
744                &topic,
745                &consumer_group,
746                mode,
747                pop_work_group_size,
748                timeout_millis,
749            )
750            .await
751        {
752            Ok(_) => Ok(()),
753            Err(e) => Err(e),
754        }
755    }
756
757    async fn reset_offset_by_queue_id(
758        &self,
759        broker_addr: CheetahString,
760        consumer_group: CheetahString,
761        topic_name: CheetahString,
762        queue_id: i32,
763        reset_offset: u64,
764    ) -> rocketmq_error::RocketMQResult<()> {
765        todo!()
766    }
767
768    async fn examine_topic_config(
769        &self,
770        addr: CheetahString,
771        topic: CheetahString,
772    ) -> rocketmq_error::RocketMQResult<TopicConfig> {
773        todo!()
774    }
775
776    async fn create_static_topic(
777        &self,
778        addr: CheetahString,
779        default_topic: CheetahString,
780        topic_config: TopicConfig,
781        mapping_detail: TopicQueueMappingDetail,
782        force: bool,
783    ) -> rocketmq_error::RocketMQResult<()> {
784        todo!()
785    }
786
787    async fn reset_master_flush_offset(
788        &self,
789        broker_addr: CheetahString,
790        master_flush_offset: u64,
791    ) -> rocketmq_error::RocketMQResult<()> {
792        todo!()
793    }
794
795    async fn get_controller_config(
796        &self,
797        controller_servers: Vec<CheetahString>,
798    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
799    {
800        todo!()
801    }
802
803    async fn update_controller_config(
804        &self,
805        properties: HashMap<CheetahString, CheetahString>,
806        controllers: Vec<CheetahString>,
807    ) -> rocketmq_error::RocketMQResult<()> {
808        todo!()
809    }
810
811    async fn clean_controller_broker_data(
812        &self,
813        controller_addr: CheetahString,
814        cluster_name: CheetahString,
815        broker_name: CheetahString,
816        broker_controller_ids_to_clean: Option<CheetahString>,
817        is_clean_living_broker: bool,
818    ) -> rocketmq_error::RocketMQResult<()> {
819        todo!()
820    }
821
822    async fn update_cold_data_flow_ctr_group_config(
823        &self,
824        broker_addr: CheetahString,
825        properties: HashMap<CheetahString, CheetahString>,
826    ) -> rocketmq_error::RocketMQResult<()> {
827        todo!()
828    }
829
830    async fn remove_cold_data_flow_ctr_group_config(
831        &self,
832        broker_addr: CheetahString,
833        consumer_group: CheetahString,
834    ) -> rocketmq_error::RocketMQResult<()> {
835        todo!()
836    }
837
838    async fn get_cold_data_flow_ctr_info(
839        &self,
840        broker_addr: CheetahString,
841    ) -> rocketmq_error::RocketMQResult<CheetahString> {
842        todo!()
843    }
844
845    async fn set_commit_log_read_ahead_mode(
846        &self,
847        broker_addr: CheetahString,
848        mode: CheetahString,
849    ) -> rocketmq_error::RocketMQResult<CheetahString> {
850        todo!()
851    }
852
853    async fn create_user(
854        &self,
855        broker_addr: CheetahString,
856        username: CheetahString,
857        password: CheetahString,
858        user_type: CheetahString,
859    ) -> rocketmq_error::RocketMQResult<()> {
860        todo!()
861    }
862
863    async fn update_user(
864        &self,
865        broker_addr: CheetahString,
866        username: CheetahString,
867        password: CheetahString,
868        user_type: CheetahString,
869        user_status: CheetahString,
870    ) -> rocketmq_error::RocketMQResult<()> {
871        todo!()
872    }
873
874    async fn delete_user(
875        &self,
876        broker_addr: CheetahString,
877        username: CheetahString,
878    ) -> rocketmq_error::RocketMQResult<()> {
879        todo!()
880    }
881
882    async fn create_acl(
883        &self,
884        broker_addr: CheetahString,
885        subject: CheetahString,
886        resources: Vec<CheetahString>,
887        actions: Vec<CheetahString>,
888        source_ips: Vec<CheetahString>,
889        decision: CheetahString,
890    ) -> rocketmq_error::RocketMQResult<()> {
891        todo!()
892    }
893
894    async fn update_acl(
895        &self,
896        broker_addr: CheetahString,
897        subject: CheetahString,
898        resources: Vec<CheetahString>,
899        actions: Vec<CheetahString>,
900        source_ips: Vec<CheetahString>,
901        decision: CheetahString,
902    ) -> rocketmq_error::RocketMQResult<()> {
903        todo!()
904    }
905
906    async fn delete_acl(
907        &self,
908        broker_addr: CheetahString,
909        subject: CheetahString,
910        resource: CheetahString,
911    ) -> rocketmq_error::RocketMQResult<()> {
912        todo!()
913    }
914}