1#![allow(dead_code)]
19use std::collections::HashMap;
20use std::collections::HashSet;
21use std::env;
22use std::sync::Arc;
23use std::time::Duration;
24
25use cheetah_string::CheetahString;
26use lazy_static::lazy_static;
27use rocketmq_common::common::base::plain_access_config::PlainAccessConfig;
28use rocketmq_common::common::base::service_state::ServiceState;
29use rocketmq_common::common::config::TopicConfig;
30use rocketmq_common::common::message::message_enum::MessageRequestMode;
31use rocketmq_common::common::message::message_queue::MessageQueue;
32use rocketmq_common::common::mix_all;
33use rocketmq_common::common::FAQUrl;
34use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
35use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable;
36use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
37use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
38use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
39use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
40use rocketmq_remoting::protocol::body::group_list::GroupList;
41use rocketmq_remoting::protocol::body::kv_table::KVTable;
42use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
43use rocketmq_remoting::protocol::body::topic::topic_list::TopicList;
44use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
45use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
46use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
47use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
48use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
49use rocketmq_remoting::runtime::RPCHook;
50use rocketmq_rust::ArcMut;
51use tracing::info;
52
53use crate::admin::mq_admin_ext_async::MQAdminExt;
54use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
55use crate::base::client_config::ClientConfig;
56use crate::common::admin_tool_result::AdminToolResult;
57use crate::factory::mq_client_instance::MQClientInstance;
58use crate::implementation::mq_client_manager::MQClientManager;
59
60lazy_static! {
61 static ref SYSTEM_GROUP_SET: HashSet<CheetahString> = {
62 let mut set = HashSet::new();
63 set.insert(CheetahString::from(mix_all::DEFAULT_CONSUMER_GROUP));
64 set.insert(CheetahString::from(mix_all::DEFAULT_PRODUCER_GROUP));
65 set.insert(CheetahString::from(mix_all::TOOLS_CONSUMER_GROUP));
66 set.insert(CheetahString::from(mix_all::SCHEDULE_CONSUMER_GROUP));
67 set.insert(CheetahString::from(mix_all::FILTERSRV_CONSUMER_GROUP));
68 set.insert(CheetahString::from(mix_all::MONITOR_CONSUMER_GROUP));
69 set.insert(CheetahString::from(mix_all::CLIENT_INNER_PRODUCER_GROUP));
70 set.insert(CheetahString::from(mix_all::SELF_TEST_PRODUCER_GROUP));
71 set.insert(CheetahString::from(mix_all::SELF_TEST_CONSUMER_GROUP));
72 set.insert(CheetahString::from(mix_all::ONS_HTTP_PROXY_GROUP));
73 set.insert(CheetahString::from(mix_all::CID_ONSAPI_PERMISSION_GROUP));
74 set.insert(CheetahString::from(mix_all::CID_ONSAPI_OWNER_GROUP));
75 set.insert(CheetahString::from(mix_all::CID_ONSAPI_PULL_GROUP));
76 set.insert(CheetahString::from(mix_all::CID_SYS_RMQ_TRANS));
77 set
78 };
79}
80
81const SOCKS_PROXY_JSON: &str = "socksProxyJson";
82const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
83pub struct DefaultMQAdminExtImpl {
84 service_state: ServiceState,
85 client_instance: Option<ArcMut<MQClientInstance>>,
86 rpc_hook: Option<Arc<dyn RPCHook>>,
87 timeout_millis: Duration,
88 kv_namespace_to_delete_list: Vec<CheetahString>,
89 client_config: ArcMut<ClientConfig>,
90 admin_ext_group: CheetahString,
91 inner: Option<ArcMut<DefaultMQAdminExtImpl>>,
92}
93
94impl DefaultMQAdminExtImpl {
95 pub fn new(
96 rpc_hook: Option<Arc<dyn RPCHook>>,
97 timeout_millis: Duration,
98 client_config: ArcMut<ClientConfig>,
99 admin_ext_group: CheetahString,
100 ) -> Self {
101 DefaultMQAdminExtImpl {
102 service_state: ServiceState::CreateJust,
103 client_instance: None,
104 rpc_hook,
105 timeout_millis,
106 kv_namespace_to_delete_list: vec![CheetahString::from_static_str(
107 NAMESPACE_ORDER_TOPIC_CONFIG,
108 )],
109 client_config,
110 admin_ext_group,
111 inner: None,
112 }
113 }
114
115 pub fn set_inner(&mut self, inner: ArcMut<DefaultMQAdminExtImpl>) {
116 self.inner = Some(inner);
117 }
118}
119
120#[allow(unused_variables)]
121#[allow(unused_mut)]
122impl MQAdminExt for DefaultMQAdminExtImpl {
123 async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
124 match self.service_state {
125 ServiceState::CreateJust => {
126 self.service_state = ServiceState::StartFailed;
127 self.client_config.change_instance_name_to_pid();
128 if "{}".eq(&self.client_config.socks_proxy_config) {
129 self.client_config.socks_proxy_config = env::var(SOCKS_PROXY_JSON)
130 .unwrap_or_else(|_| "{}".to_string())
131 .into();
132 }
133 self.client_instance = Some(
134 MQClientManager::get_instance().get_or_create_mq_client_instance(
135 self.client_config.as_ref().clone(),
136 self.rpc_hook.clone(),
137 ),
138 );
139
140 let group = &self.admin_ext_group.clone();
141 let register_ok = self
142 .client_instance
143 .as_mut()
144 .unwrap()
145 .register_admin_ext(
146 group,
147 MQAdminExtInnerImpl {
148 inner: self.inner.as_ref().unwrap().clone(),
149 },
150 )
151 .await;
152 if !register_ok {
153 self.service_state = ServiceState::StartFailed;
154 return Err(rocketmq_error::RocketMQError::illegal_argument(format!(
155 "The adminExt group[{}] has created already, specified another name \
156 please.{}",
157 self.admin_ext_group,
158 FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL)
159 )));
160 }
161 let arc_mut = self.client_instance.clone().unwrap();
162 self.client_instance
163 .as_mut()
164 .unwrap()
165 .start(arc_mut)
166 .await?;
167 self.service_state = ServiceState::Running;
168 info!("the adminExt [{}] start OK", self.admin_ext_group);
169 Ok(())
170 }
171 ServiceState::Running | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
172 unimplemented!()
173 }
174 }
175 }
176
177 async fn shutdown(&mut self) {
178 match self.service_state {
179 ServiceState::CreateJust
180 | ServiceState::ShutdownAlready
181 | ServiceState::StartFailed => {
182 }
184 ServiceState::Running => {
185 let instance = self.client_instance.as_mut().unwrap();
186 instance.unregister_admin_ext(&self.admin_ext_group).await;
187 instance.shutdown().await;
188 self.service_state = ServiceState::ShutdownAlready;
189 }
190 }
191 }
192
193 async fn add_broker_to_container(
194 &self,
195 broker_container_addr: CheetahString,
196 broker_config: CheetahString,
197 ) -> rocketmq_error::RocketMQResult<()> {
198 todo!()
199 }
200
201 async fn remove_broker_from_container(
202 &self,
203 broker_container_addr: CheetahString,
204 cluster_name: CheetahString,
205 broker_name: CheetahString,
206 broker_id: u64,
207 ) -> rocketmq_error::RocketMQResult<()> {
208 todo!()
209 }
210
211 async fn update_broker_config(
212 &self,
213 broker_addr: CheetahString,
214 properties: HashMap<CheetahString, CheetahString>,
215 ) -> rocketmq_error::RocketMQResult<()> {
216 todo!()
217 }
218
219 async fn get_broker_config(
220 &self,
221 broker_addr: CheetahString,
222 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, CheetahString>> {
223 todo!()
224 }
225
226 async fn create_and_update_topic_config(
227 &self,
228 addr: CheetahString,
229 config: TopicConfig,
230 ) -> rocketmq_error::RocketMQResult<()> {
231 todo!()
232 }
233
234 async fn create_and_update_topic_config_list(
235 &self,
236 addr: CheetahString,
237 topic_config_list: Vec<TopicConfig>,
238 ) -> rocketmq_error::RocketMQResult<()> {
239 todo!()
240 }
241
242 async fn create_and_update_plain_access_config(
243 &self,
244 addr: CheetahString,
245 config: PlainAccessConfig,
246 ) -> rocketmq_error::RocketMQResult<()> {
247 todo!()
248 }
249
250 async fn delete_plain_access_config(
251 &self,
252 addr: CheetahString,
253 access_key: CheetahString,
254 ) -> rocketmq_error::RocketMQResult<()> {
255 todo!()
256 }
257
258 async fn update_global_white_addr_config(
259 &self,
260 addr: CheetahString,
261 global_white_addrs: CheetahString,
262 acl_file_full_path: Option<CheetahString>,
263 ) -> rocketmq_error::RocketMQResult<()> {
264 todo!()
265 }
266
267 async fn examine_broker_cluster_acl_version_info(
268 &self,
269 addr: CheetahString,
270 ) -> rocketmq_error::RocketMQResult<CheetahString> {
271 todo!()
272 }
273
274 async fn create_and_update_subscription_group_config(
275 &self,
276 addr: CheetahString,
277 config: SubscriptionGroupConfig,
278 ) -> rocketmq_error::RocketMQResult<()> {
279 todo!()
280 }
281
282 async fn create_and_update_subscription_group_config_list(
283 &self,
284 broker_addr: CheetahString,
285 configs: Vec<SubscriptionGroupConfig>,
286 ) -> rocketmq_error::RocketMQResult<()> {
287 todo!()
288 }
289
290 async fn examine_subscription_group_config(
291 &self,
292 addr: CheetahString,
293 group: CheetahString,
294 ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
295 todo!()
296 }
297
298 async fn examine_topic_stats(
299 &self,
300 topic: CheetahString,
301 broker_addr: Option<CheetahString>,
302 ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
303 todo!()
304 }
305
306 async fn examine_topic_stats_concurrent(
307 &self,
308 topic: CheetahString,
309 ) -> AdminToolResult<TopicStatsTable> {
310 todo!()
311 }
312
313 async fn fetch_all_topic_list(&self) -> rocketmq_error::RocketMQResult<TopicList> {
314 todo!()
315 }
316
317 async fn fetch_topics_by_cluster(
318 &self,
319 cluster_name: CheetahString,
320 ) -> rocketmq_error::RocketMQResult<TopicList> {
321 todo!()
322 }
323
324 async fn fetch_broker_runtime_stats(
325 &self,
326 broker_addr: CheetahString,
327 ) -> rocketmq_error::RocketMQResult<KVTable> {
328 todo!()
329 }
330
331 async fn examine_consume_stats(
332 &self,
333 consumer_group: CheetahString,
334 topic: Option<CheetahString>,
335 cluster_name: Option<CheetahString>,
336 broker_addr: Option<CheetahString>,
337 timeout_millis: Option<u64>,
338 ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
339 todo!()
340 }
341
342 async fn examine_broker_cluster_info(&self) -> rocketmq_error::RocketMQResult<ClusterInfo> {
343 self.client_instance
344 .as_ref()
345 .unwrap()
346 .get_mq_client_api_impl()
347 .get_broker_cluster_info(self.timeout_millis.as_millis() as u64)
348 .await
349 }
350
351 async fn examine_topic_route_info(
352 &self,
353 topic: CheetahString,
354 ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
355 self.client_instance
356 .as_ref()
357 .unwrap()
358 .mq_client_api_impl
359 .as_ref()
360 .unwrap()
361 .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
362 .await
363 }
364
365 async fn examine_consumer_connection_info(
366 &self,
367 consumer_group: CheetahString,
368 broker_addr: Option<CheetahString>,
369 ) -> rocketmq_error::RocketMQResult<ConsumerConnection> {
370 todo!()
371 }
372
373 async fn examine_producer_connection_info(
374 &self,
375 producer_group: CheetahString,
376 topic: CheetahString,
377 ) -> rocketmq_error::RocketMQResult<ProducerConnection> {
378 todo!()
379 }
380
381 async fn get_name_server_address_list(&self) -> Vec<CheetahString> {
382 self.client_instance
383 .as_ref()
384 .unwrap()
385 .get_mq_client_api_impl()
386 .get_name_server_address_list()
387 .to_vec()
388 }
389
390 async fn wipe_write_perm_of_broker(
391 &self,
392 namesrv_addr: CheetahString,
393 broker_name: CheetahString,
394 ) -> rocketmq_error::RocketMQResult<i32> {
395 self.client_instance
396 .as_ref()
397 .unwrap()
398 .get_mq_client_api_impl()
399 .wipe_write_perm_of_broker(
400 namesrv_addr,
401 broker_name,
402 self.timeout_millis.as_millis() as u64,
403 )
404 .await
405 }
406
407 async fn add_write_perm_of_broker(
408 &self,
409 namesrv_addr: CheetahString,
410 broker_name: CheetahString,
411 ) -> rocketmq_error::RocketMQResult<i32> {
412 self.client_instance
413 .as_ref()
414 .unwrap()
415 .get_mq_client_api_impl()
416 .add_write_perm_of_broker(
417 namesrv_addr,
418 broker_name,
419 self.timeout_millis.as_millis() as u64,
420 )
421 .await
422 }
423
424 async fn put_kv_config(
425 &self,
426 namespace: CheetahString,
427 key: CheetahString,
428 value: CheetahString,
429 ) {
430 todo!()
431 }
432
433 async fn get_kv_config(
434 &self,
435 namespace: CheetahString,
436 key: CheetahString,
437 ) -> rocketmq_error::RocketMQResult<CheetahString> {
438 todo!()
439 }
440
441 async fn get_kv_list_by_namespace(
442 &self,
443 namespace: CheetahString,
444 ) -> rocketmq_error::RocketMQResult<KVTable> {
445 todo!()
446 }
447
448 async fn delete_topic(
449 &self,
450 topic_name: CheetahString,
451 cluster_name: CheetahString,
452 ) -> rocketmq_error::RocketMQResult<()> {
453 todo!()
454 }
455
456 async fn delete_topic_in_broker(
457 &self,
458 addrs: HashSet<CheetahString>,
459 topic: CheetahString,
460 ) -> rocketmq_error::RocketMQResult<()> {
461 todo!()
462 }
463
464 async fn delete_topic_in_name_server(
465 &self,
466 addrs: HashSet<CheetahString>,
467 cluster_name: Option<CheetahString>,
468 topic: CheetahString,
469 ) -> rocketmq_error::RocketMQResult<()> {
470 todo!()
471 }
472
473 async fn delete_subscription_group(
474 &self,
475 addr: CheetahString,
476 group_name: CheetahString,
477 remove_offset: Option<bool>,
478 ) -> rocketmq_error::RocketMQResult<()> {
479 todo!()
480 }
481
482 async fn create_and_update_kv_config(
483 &self,
484 namespace: CheetahString,
485 key: CheetahString,
486 value: CheetahString,
487 ) -> rocketmq_error::RocketMQResult<()> {
488 self.client_instance
489 .as_ref()
490 .unwrap()
491 .get_mq_client_api_impl()
492 .put_kvconfig_value(
493 namespace,
494 key,
495 value,
496 self.timeout_millis.as_millis() as u64,
497 )
498 .await
499 }
500
501 async fn delete_kv_config(
502 &self,
503 namespace: CheetahString,
504 key: CheetahString,
505 ) -> rocketmq_error::RocketMQResult<()> {
506 self.client_instance
507 .as_ref()
508 .unwrap()
509 .get_mq_client_api_impl()
510 .delete_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
511 .await
512 }
513
514 async fn reset_offset_by_timestamp(
515 &self,
516 cluster_name: Option<CheetahString>,
517 topic: CheetahString,
518 group: CheetahString,
519 timestamp: u64,
520 is_force: bool,
521 ) -> rocketmq_error::RocketMQResult<HashMap<MessageQueue, u64>> {
522 todo!()
523 }
524
525 async fn reset_offset_new(
526 &self,
527 consumer_group: CheetahString,
528 topic: CheetahString,
529 timestamp: u64,
530 ) -> rocketmq_error::RocketMQResult<()> {
531 todo!()
532 }
533
534 async fn get_consume_status(
535 &self,
536 topic: CheetahString,
537 group: CheetahString,
538 client_addr: CheetahString,
539 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<MessageQueue, u64>>> {
540 todo!()
541 }
542
543 async fn create_or_update_order_conf(
544 &self,
545 key: CheetahString,
546 value: CheetahString,
547 is_cluster: bool,
548 ) -> rocketmq_error::RocketMQResult<()> {
549 todo!()
550 }
551
552 async fn query_topic_consume_by_who(
553 &self,
554 topic: CheetahString,
555 ) -> rocketmq_error::RocketMQResult<GroupList> {
556 todo!()
557 }
558
559 async fn query_topics_by_consumer(
560 &self,
561 group: CheetahString,
562 ) -> rocketmq_error::RocketMQResult<TopicList> {
563 todo!()
564 }
565
566 async fn query_topics_by_consumer_concurrent(
567 &self,
568 group: CheetahString,
569 ) -> AdminToolResult<TopicList> {
570 todo!()
571 }
572
573 async fn query_subscription(
574 &self,
575 group: CheetahString,
576 topic: CheetahString,
577 ) -> rocketmq_error::RocketMQResult<SubscriptionData> {
578 todo!()
579 }
580
581 async fn clean_expired_consumer_queue(
582 &self,
583 cluster: Option<CheetahString>,
584 addr: Option<CheetahString>,
585 ) -> rocketmq_error::RocketMQResult<bool> {
586 todo!()
587 }
588
589 async fn delete_expired_commit_log(
590 &self,
591 cluster: Option<CheetahString>,
592 addr: Option<CheetahString>,
593 ) -> rocketmq_error::RocketMQResult<bool> {
594 todo!()
595 }
596
597 async fn clean_unused_topic(
598 &self,
599 cluster: Option<CheetahString>,
600 addr: Option<CheetahString>,
601 ) -> rocketmq_error::RocketMQResult<bool> {
602 todo!()
603 }
604
605 async fn get_consumer_running_info(
606 &self,
607 consumer_group: CheetahString,
608 client_id: CheetahString,
609 jstack: bool,
610 metrics: Option<bool>,
611 ) -> rocketmq_error::RocketMQResult<ConsumerRunningInfo> {
612 todo!()
613 }
614
615 async fn consume_message_directly(
616 &self,
617 consumer_group: CheetahString,
618 client_id: CheetahString,
619 topic: CheetahString,
620 msg_id: CheetahString,
621 ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
622 todo!()
623 }
624
625 async fn consume_message_directly_ext(
626 &self,
627 cluster_name: CheetahString,
628 consumer_group: CheetahString,
629 client_id: CheetahString,
630 topic: CheetahString,
631 msg_id: CheetahString,
632 ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
633 todo!()
634 }
635
636 async fn clone_group_offset(
637 &self,
638 src_group: CheetahString,
639 dest_group: CheetahString,
640 topic: CheetahString,
641 is_offline: bool,
642 ) -> rocketmq_error::RocketMQResult<()> {
643 todo!()
644 }
645
646 async fn get_cluster_list(
647 &self,
648 topic: String,
649 ) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
650 todo!()
651 }
652
653 async fn get_topic_cluster_list(
654 &self,
655 topic: String,
656 ) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
657 let cluster_info = self.examine_broker_cluster_info().await?;
658 let topic_route_data = self.examine_topic_route_info(topic.into()).await?.unwrap();
659 let broker_data = topic_route_data
660 .broker_datas
661 .first()
662 .ok_or_else(|| mq_client_err!("Broker datas is empty"))?;
663 let mut cluster_set = HashSet::new();
664 let broker_name = broker_data.broker_name();
665 if let Some(cluster_addr_table) = cluster_info.cluster_addr_table.as_ref() {
666 cluster_set.extend(
667 cluster_addr_table
668 .iter()
669 .filter(|(cluster_name, broker_names)| broker_names.contains(broker_name))
670 .map(|(cluster_name, broker_names)| cluster_name.clone()),
671 );
672 }
673 Ok(cluster_set)
674 }
675
676 async fn get_all_topic_config(
677 &self,
678 broker_addr: CheetahString,
679 timeout_millis: u64,
680 ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
681 todo!()
682 }
683
684 async fn get_user_topic_config(
685 &self,
686 broker_addr: CheetahString,
687 special_topic: bool,
688 timeout_millis: u64,
689 ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
690 todo!()
691 }
692
693 async fn update_consume_offset(
694 &self,
695 broker_addr: CheetahString,
696 consume_group: CheetahString,
697 mq: MessageQueue,
698 offset: u64,
699 ) -> rocketmq_error::RocketMQResult<()> {
700 todo!()
701 }
702
703 async fn update_name_server_config(
704 &self,
705 properties: HashMap<CheetahString, CheetahString>,
706 name_servers: Option<Vec<CheetahString>>,
707 ) -> rocketmq_error::RocketMQResult<()> {
708 self.client_instance
709 .as_ref()
710 .unwrap()
711 .get_mq_client_api_impl()
712 .update_name_server_config(
713 properties,
714 name_servers,
715 self.timeout_millis.as_millis() as u64,
716 )
717 .await
718 }
719
720 async fn get_name_server_config(
721 &self,
722 name_servers: Vec<CheetahString>,
723 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
724 {
725 Ok(self
726 .client_instance
727 .as_ref()
728 .unwrap()
729 .mq_client_api_impl
730 .as_ref()
731 .unwrap()
732 .get_name_server_config(Some(name_servers), self.timeout_millis)
733 .await?
734 .unwrap_or_default())
735 }
736
737 async fn resume_check_half_message(
738 &self,
739 topic: CheetahString,
740 msg_id: CheetahString,
741 ) -> rocketmq_error::RocketMQResult<bool> {
742 todo!()
743 }
744
745 async fn set_message_request_mode(
746 &self,
747 broker_addr: CheetahString,
748 topic: CheetahString,
749 consumer_group: CheetahString,
750 mode: MessageRequestMode,
751 pop_work_group_size: i32,
752 timeout_millis: u64,
753 ) -> rocketmq_error::RocketMQResult<()> {
754 let mut mq_client_api = self
755 .client_instance
756 .as_ref()
757 .unwrap()
758 .get_mq_client_api_impl();
759 match mq_client_api
760 .set_message_request_mode(
761 &broker_addr,
762 &topic,
763 &consumer_group,
764 mode,
765 pop_work_group_size,
766 timeout_millis,
767 )
768 .await
769 {
770 Ok(_) => Ok(()),
771 Err(e) => Err(e),
772 }
773 }
774
775 async fn reset_offset_by_queue_id(
776 &self,
777 broker_addr: CheetahString,
778 consumer_group: CheetahString,
779 topic_name: CheetahString,
780 queue_id: i32,
781 reset_offset: u64,
782 ) -> rocketmq_error::RocketMQResult<()> {
783 todo!()
784 }
785
786 async fn examine_topic_config(
787 &self,
788 addr: CheetahString,
789 topic: CheetahString,
790 ) -> rocketmq_error::RocketMQResult<TopicConfig> {
791 todo!()
792 }
793
794 async fn create_static_topic(
795 &self,
796 addr: CheetahString,
797 default_topic: CheetahString,
798 topic_config: TopicConfig,
799 mapping_detail: TopicQueueMappingDetail,
800 force: bool,
801 ) -> rocketmq_error::RocketMQResult<()> {
802 todo!()
803 }
804
805 async fn reset_master_flush_offset(
806 &self,
807 broker_addr: CheetahString,
808 master_flush_offset: u64,
809 ) -> rocketmq_error::RocketMQResult<()> {
810 todo!()
811 }
812
813 async fn get_controller_config(
814 &self,
815 controller_servers: Vec<CheetahString>,
816 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
817 {
818 todo!()
819 }
820
821 async fn update_controller_config(
822 &self,
823 properties: HashMap<CheetahString, CheetahString>,
824 controllers: Vec<CheetahString>,
825 ) -> rocketmq_error::RocketMQResult<()> {
826 todo!()
827 }
828
829 async fn clean_controller_broker_data(
830 &self,
831 controller_addr: CheetahString,
832 cluster_name: CheetahString,
833 broker_name: CheetahString,
834 broker_controller_ids_to_clean: Option<CheetahString>,
835 is_clean_living_broker: bool,
836 ) -> rocketmq_error::RocketMQResult<()> {
837 todo!()
838 }
839
840 async fn update_cold_data_flow_ctr_group_config(
841 &self,
842 broker_addr: CheetahString,
843 properties: HashMap<CheetahString, CheetahString>,
844 ) -> rocketmq_error::RocketMQResult<()> {
845 todo!()
846 }
847
848 async fn remove_cold_data_flow_ctr_group_config(
849 &self,
850 broker_addr: CheetahString,
851 consumer_group: CheetahString,
852 ) -> rocketmq_error::RocketMQResult<()> {
853 todo!()
854 }
855
856 async fn get_cold_data_flow_ctr_info(
857 &self,
858 broker_addr: CheetahString,
859 ) -> rocketmq_error::RocketMQResult<CheetahString> {
860 todo!()
861 }
862
863 async fn set_commit_log_read_ahead_mode(
864 &self,
865 broker_addr: CheetahString,
866 mode: CheetahString,
867 ) -> rocketmq_error::RocketMQResult<CheetahString> {
868 todo!()
869 }
870
871 async fn create_user(
872 &self,
873 broker_addr: CheetahString,
874 username: CheetahString,
875 password: CheetahString,
876 user_type: CheetahString,
877 ) -> rocketmq_error::RocketMQResult<()> {
878 todo!()
879 }
880
881 async fn update_user(
882 &self,
883 broker_addr: CheetahString,
884 username: CheetahString,
885 password: CheetahString,
886 user_type: CheetahString,
887 user_status: CheetahString,
888 ) -> rocketmq_error::RocketMQResult<()> {
889 todo!()
890 }
891
892 async fn delete_user(
893 &self,
894 broker_addr: CheetahString,
895 username: CheetahString,
896 ) -> rocketmq_error::RocketMQResult<()> {
897 todo!()
898 }
899
900 async fn create_acl(
901 &self,
902 broker_addr: CheetahString,
903 subject: CheetahString,
904 resources: Vec<CheetahString>,
905 actions: Vec<CheetahString>,
906 source_ips: Vec<CheetahString>,
907 decision: CheetahString,
908 ) -> rocketmq_error::RocketMQResult<()> {
909 todo!()
910 }
911
912 async fn update_acl(
913 &self,
914 broker_addr: CheetahString,
915 subject: CheetahString,
916 resources: Vec<CheetahString>,
917 actions: Vec<CheetahString>,
918 source_ips: Vec<CheetahString>,
919 decision: CheetahString,
920 ) -> rocketmq_error::RocketMQResult<()> {
921 todo!()
922 }
923
924 async fn delete_acl(
925 &self,
926 broker_addr: CheetahString,
927 subject: CheetahString,
928 resource: CheetahString,
929 ) -> rocketmq_error::RocketMQResult<()> {
930 todo!()
931 }
932}