Skip to main content

rocketmq_client_rust/admin/
default_mq_admin_ext_impl.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16use std::collections::HashMap;
17use std::collections::HashSet;
18use std::env;
19use std::sync::Arc;
20use std::sync::OnceLock;
21use std::time::Duration;
22
23use crate::admin::mq_admin_ext_async::MQAdminExt;
24use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
25use crate::base::client_config::ClientConfig;
26use crate::base::validators::Validators;
27use crate::common::admin_tool_result::AdminToolResult;
28use crate::common::admin_tools_result_code_enum::AdminToolsResultCodeEnum;
29use crate::consumer::consumer_impl::pull_request_ext::PullResultExt;
30use crate::consumer::pull_callback::PullCallback;
31use crate::consumer::pull_status::PullStatus;
32use crate::factory::mq_client_instance::MQClientInstance;
33use crate::implementation::communication_mode::CommunicationMode;
34use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
35use crate::implementation::mq_client_manager::MQClientManager;
36use cheetah_string::CheetahString;
37use rand::seq::IndexedRandom;
38use rocketmq_common::common::attribute::attribute_parser::AttributeParser;
39use rocketmq_common::common::base::plain_access_config::PlainAccessConfig;
40use rocketmq_common::common::base::service_state::ServiceState;
41use rocketmq_common::common::config::TopicConfig;
42use rocketmq_common::common::constant::PermName;
43use rocketmq_common::common::message::message_decoder;
44use rocketmq_common::common::message::message_enum::MessageRequestMode;
45use rocketmq_common::common::message::message_ext::MessageExt;
46use rocketmq_common::common::message::message_queue::MessageQueue;
47use rocketmq_common::common::mix_all;
48use rocketmq_common::common::mix_all::DLQ_GROUP_TOPIC_PREFIX;
49use rocketmq_common::common::mix_all::RETRY_GROUP_TOPIC_PREFIX;
50use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
51#[allow(deprecated)]
52use rocketmq_common::common::tools::broker_operator_result::BrokerOperatorResult;
53#[allow(deprecated)]
54use rocketmq_common::common::tools::message_track::MessageTrack;
55#[allow(deprecated)]
56use rocketmq_common::common::tools::track_type::TrackType;
57use rocketmq_common::common::topic::TopicValidator;
58use rocketmq_common::common::FAQUrl;
59use rocketmq_error::RocketMQError;
60use rocketmq_remoting::code::response_code::ResponseCode;
61use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
62use rocketmq_remoting::protocol::admin::consume_stats_list::ConsumeStatsList;
63use rocketmq_remoting::protocol::admin::offset_wrapper::OffsetWrapper;
64use rocketmq_remoting::protocol::admin::rollback_stats::RollbackStats;
65use rocketmq_remoting::protocol::admin::topic_offset::TopicOffset;
66use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable;
67use rocketmq_remoting::protocol::body::acl_info::AclInfo;
68use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
69use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
70use rocketmq_remoting::protocol::body::broker_replicas_info::BrokerReplicasInfo;
71use rocketmq_remoting::protocol::body::check_rocksdb_cqwrite_progress_response_body::CheckRocksdbCqWriteResult;
72use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
73use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
74use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
75use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntryCache;
76use rocketmq_remoting::protocol::body::get_broker_lite_info_response_body::GetBrokerLiteInfoResponseBody;
77use rocketmq_remoting::protocol::body::get_lite_client_info_response_body::GetLiteClientInfoResponseBody;
78use rocketmq_remoting::protocol::body::get_lite_group_info_response_body::GetLiteGroupInfoResponseBody;
79use rocketmq_remoting::protocol::body::get_lite_topic_info_response_body::GetLiteTopicInfoResponseBody;
80use rocketmq_remoting::protocol::body::get_parent_topic_info_response_body::GetParentTopicInfoResponseBody;
81use rocketmq_remoting::protocol::body::group_list::GroupList;
82use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
83use rocketmq_remoting::protocol::body::kv_table::KVTable;
84use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
85use rocketmq_remoting::protocol::body::producer_table_info::ProducerTableInfo;
86use rocketmq_remoting::protocol::body::query_consume_queue_response_body::QueryConsumeQueueResponseBody;
87use rocketmq_remoting::protocol::body::queue_time_span::QueueTimeSpan;
88use rocketmq_remoting::protocol::body::subscription_group_wrapper::SubscriptionGroupWrapper;
89use rocketmq_remoting::protocol::body::topic::topic_list::TopicList;
90use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
91use rocketmq_remoting::protocol::body::user_info::UserInfo;
92use rocketmq_remoting::protocol::header::consume_message_directly_result_request_header::ConsumeMessageDirectlyResultRequestHeader;
93use rocketmq_remoting::protocol::header::create_topic_request_header::CreateTopicRequestHeader;
94use rocketmq_remoting::protocol::header::delete_topic_request_header::DeleteTopicRequestHeader;
95use rocketmq_remoting::protocol::header::elect_master_response_header::ElectMasterResponseHeader;
96use rocketmq_remoting::protocol::header::get_consume_stats_request_header::GetConsumeStatsRequestHeader;
97use rocketmq_remoting::protocol::header::get_meta_data_response_header::GetMetaDataResponseHeader;
98use rocketmq_remoting::protocol::header::get_topic_config_request_header::GetTopicConfigRequestHeader;
99use rocketmq_remoting::protocol::header::get_topic_stats_info_request_header::GetTopicStatsInfoRequestHeader;
100use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::DeleteTopicFromNamesrvRequestHeader;
101use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
102use rocketmq_remoting::protocol::header::query_topic_consume_by_who_request_header::QueryTopicConsumeByWhoRequestHeader;
103use rocketmq_remoting::protocol::header::reset_offset_request_header::ResetOffsetRequestHeader;
104use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader;
105use rocketmq_remoting::protocol::header::view_broker_stats_data_request_header::ViewBrokerStatsDataRequestHeader;
106use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
107use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
108use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
109use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
110use rocketmq_remoting::protocol::route::route_data_view::QueueData;
111use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
112use rocketmq_remoting::protocol::static_topic::topic_config_and_queue_mapping::TopicConfigAndQueueMapping;
113use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
114use rocketmq_remoting::protocol::subscription::broker_stats_data::BrokerStatsData;
115use rocketmq_remoting::protocol::subscription::group_forbidden::GroupForbidden;
116use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
117use rocketmq_remoting::runtime::RPCHook;
118use rocketmq_rust::ArcMut;
119use tracing::info;
120
121static SYSTEM_GROUP_SET: OnceLock<HashSet<CheetahString>> = OnceLock::new();
122
123fn get_system_group_set() -> &'static HashSet<CheetahString> {
124    SYSTEM_GROUP_SET.get_or_init(|| {
125        let mut set = HashSet::new();
126        set.insert(CheetahString::from(mix_all::DEFAULT_CONSUMER_GROUP));
127        set.insert(CheetahString::from(mix_all::DEFAULT_PRODUCER_GROUP));
128        set.insert(CheetahString::from(mix_all::TOOLS_CONSUMER_GROUP));
129        set.insert(CheetahString::from(mix_all::SCHEDULE_CONSUMER_GROUP));
130        set.insert(CheetahString::from(mix_all::FILTERSRV_CONSUMER_GROUP));
131        set.insert(CheetahString::from(mix_all::MONITOR_CONSUMER_GROUP));
132        set.insert(CheetahString::from(mix_all::CLIENT_INNER_PRODUCER_GROUP));
133        set.insert(CheetahString::from(mix_all::SELF_TEST_PRODUCER_GROUP));
134        set.insert(CheetahString::from(mix_all::SELF_TEST_CONSUMER_GROUP));
135        set.insert(CheetahString::from(mix_all::ONS_HTTP_PROXY_GROUP));
136        set.insert(CheetahString::from(mix_all::CID_ONSAPI_PERMISSION_GROUP));
137        set.insert(CheetahString::from(mix_all::CID_ONSAPI_OWNER_GROUP));
138        set.insert(CheetahString::from(mix_all::CID_ONSAPI_PULL_GROUP));
139        set.insert(CheetahString::from(mix_all::CID_SYS_RMQ_TRANS));
140        set
141    })
142}
143
144const SOCKS_PROXY_JSON: &str = "socksProxyJson";
145const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
146
147fn encode_topic_attributes(attributes: &HashMap<CheetahString, CheetahString>) -> Option<CheetahString> {
148    if attributes.is_empty() {
149        return None;
150    }
151
152    let serialized = AttributeParser::parse_to_string(
153        &attributes
154            .iter()
155            .map(|(key, value)| (key.to_string(), value.to_string()))
156            .collect::<HashMap<String, String>>(),
157    );
158
159    if serialized.is_empty() {
160        None
161    } else {
162        Some(serialized.into())
163    }
164}
165
166pub struct DefaultMQAdminExtImpl {
167    service_state: ServiceState,
168    client_instance: Option<ArcMut<MQClientInstance>>,
169    rpc_hook: Option<Arc<dyn RPCHook>>,
170    timeout_millis: Duration,
171    kv_namespace_to_delete_list: Vec<CheetahString>,
172    client_config: ArcMut<ClientConfig>,
173    admin_ext_group: CheetahString,
174    inner: Option<ArcMut<DefaultMQAdminExtImpl>>,
175}
176
177impl DefaultMQAdminExtImpl {
178    pub fn new(
179        rpc_hook: Option<Arc<dyn RPCHook>>,
180        timeout_millis: Duration,
181        client_config: ArcMut<ClientConfig>,
182        admin_ext_group: CheetahString,
183    ) -> Self {
184        DefaultMQAdminExtImpl {
185            service_state: ServiceState::CreateJust,
186            client_instance: None,
187            rpc_hook,
188            timeout_millis,
189            kv_namespace_to_delete_list: vec![CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG)],
190            client_config,
191            admin_ext_group,
192            inner: None,
193        }
194    }
195
196    pub fn set_inner(&mut self, inner: ArcMut<DefaultMQAdminExtImpl>) {
197        self.inner = Some(inner);
198    }
199
200    pub fn has_inner(&self) -> bool {
201        self.inner.is_some()
202    }
203
204    pub async fn create_acl_with_acl_info(
205        &self,
206        broker_addr: CheetahString,
207        acl_info: AclInfo,
208    ) -> rocketmq_error::RocketMQResult<()> {
209        let subject = acl_info
210            .subject
211            .clone()
212            .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("ACL subject is required".into()))?;
213
214        if let Some(ref policies) = acl_info.policies {
215            for policy in policies {
216                if let Some(ref entries) = policy.entries {
217                    for entry in entries {
218                        let resources: Vec<CheetahString> =
219                            entry.resource.as_ref().map(|r| vec![r.clone()]).unwrap_or_default();
220
221                        let actions: Vec<CheetahString> = entry
222                            .actions
223                            .as_ref()
224                            .map(|a| a.split(',').map(|s| CheetahString::from(s.trim())).collect())
225                            .unwrap_or_default();
226
227                        let source_ips: Vec<CheetahString> = entry.source_ips.clone().unwrap_or_default();
228
229                        let decision: CheetahString = entry.decision.clone().unwrap_or_default();
230
231                        self.create_acl(
232                            broker_addr.clone(),
233                            subject.clone(),
234                            resources,
235                            actions,
236                            source_ips,
237                            decision,
238                        )
239                        .await?;
240                    }
241                }
242            }
243        }
244
245        Ok(())
246    }
247
248    pub async fn update_acl_with_acl_info(
249        &self,
250        broker_addr: CheetahString,
251        acl_info: AclInfo,
252    ) -> rocketmq_error::RocketMQResult<()> {
253        let subject = acl_info
254            .subject
255            .clone()
256            .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("ACL subject is required".into()))?;
257
258        if let Some(ref policies) = acl_info.policies {
259            for policy in policies {
260                if let Some(ref entries) = policy.entries {
261                    for entry in entries {
262                        let resources: Vec<CheetahString> =
263                            entry.resource.as_ref().map(|r| vec![r.clone()]).unwrap_or_default();
264
265                        let actions: Vec<CheetahString> = entry
266                            .actions
267                            .as_ref()
268                            .map(|a| a.split(',').map(|s| CheetahString::from(s.trim())).collect())
269                            .unwrap_or_default();
270
271                        let source_ips: Vec<CheetahString> = entry.source_ips.clone().unwrap_or_default();
272
273                        let decision: CheetahString = entry.decision.clone().unwrap_or_default();
274
275                        self.update_acl(
276                            broker_addr.clone(),
277                            subject.clone(),
278                            resources,
279                            actions,
280                            source_ips,
281                            decision,
282                        )
283                        .await?;
284                    }
285                }
286            }
287        }
288
289        Ok(())
290    }
291
292    pub async fn create_user_with_user_info(
293        &self,
294        broker_addr: CheetahString,
295        user_info: UserInfo,
296    ) -> rocketmq_error::RocketMQResult<()> {
297        let username = user_info
298            .username
299            .clone()
300            .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("User username is required".into()))?;
301
302        let password = user_info.password.clone().unwrap_or_default();
303        let user_type = user_info.user_type.clone().unwrap_or_default();
304
305        self.create_user(broker_addr, username, password, user_type).await
306    }
307
308    pub async fn update_user_with_user_info(
309        &self,
310        broker_addr: CheetahString,
311        user_info: UserInfo,
312    ) -> rocketmq_error::RocketMQResult<()> {
313        let username = user_info
314            .username
315            .clone()
316            .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("User username is required".into()))?;
317
318        let password = user_info.password.clone().unwrap_or_default();
319        let user_type = user_info.user_type.clone().unwrap_or_default();
320        let user_status = user_info.user_status.clone().unwrap_or_default();
321
322        self.update_user(broker_addr, username, password, user_type, user_status)
323            .await
324    }
325
326    pub async fn pull_message_from_queue(
327        &self,
328        broker_addr: &str,
329        mq: &MessageQueue,
330        sub_expression: &str,
331        offset: i64,
332        max_nums: i32,
333        timeout_millis: u64,
334    ) -> rocketmq_error::RocketMQResult<crate::consumer::pull_result::PullResult> {
335        let sys_flag = PullSysFlag::build_sys_flag(false, false, true, false);
336
337        let request_header = PullMessageRequestHeader {
338            consumer_group: CheetahString::from_static_str(mix_all::TOOLS_CONSUMER_GROUP),
339            topic: mq.topic().clone(),
340            queue_id: mq.queue_id(),
341            queue_offset: offset,
342            max_msg_nums: max_nums,
343            sys_flag: sys_flag as i32,
344            commit_offset: 0,
345            suspend_timeout_millis: 0,
346            sub_version: 0,
347            subscription: Some(CheetahString::from(sub_expression)),
348            expression_type: None,
349            max_msg_bytes: None,
350            request_source: None,
351            proxy_forward_client_id: None,
352            topic_request: None,
353        };
354
355        struct NoopPullCallback;
356        impl PullCallback for NoopPullCallback {
357            async fn on_success(&mut self, _pull_result: PullResultExt) {}
358            fn on_exception(&mut self, _e: Box<dyn std::error::Error + Send>) {}
359        }
360
361        let api_impl = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
362
363        let mut result = MQClientAPIImpl::pull_message(
364            api_impl,
365            CheetahString::from(broker_addr),
366            request_header,
367            timeout_millis,
368            CommunicationMode::Sync,
369            NoopPullCallback,
370        )
371        .await?
372        .ok_or_else(|| rocketmq_error::RocketMQError::Internal("pull_message returned None in sync mode".into()))?;
373
374        if result.pull_result.pull_status == PullStatus::Found {
375            if let Some(mut message_binary) = result.message_binary.take() {
376                let msg_vec = message_decoder::decodes_batch(&mut message_binary, true, true);
377                result.pull_result.msg_found_list = Some(msg_vec.into_iter().map(ArcMut::new).collect());
378            }
379        }
380
381        Ok(result.pull_result)
382    }
383
384    pub async fn query_message_by_key(
385        &self,
386        cluster_name: Option<CheetahString>,
387        topic: CheetahString,
388        key: CheetahString,
389        max_num: i32,
390        begin_timestamp: i64,
391        end_timestamp: i64,
392        _key_type: CheetahString,
393        _last_key: Option<CheetahString>,
394    ) -> rocketmq_error::RocketMQResult<crate::base::query_result::QueryResult> {
395        self.query_message_by_key_internal(cluster_name, topic, key, max_num, begin_timestamp, end_timestamp, false)
396            .await
397    }
398
399    pub async fn query_message_by_unique_key(
400        &self,
401        cluster_name: Option<CheetahString>,
402        topic: CheetahString,
403        unique_key: CheetahString,
404        max_num: i32,
405        begin_timestamp: i64,
406        end_timestamp: i64,
407    ) -> rocketmq_error::RocketMQResult<crate::base::query_result::QueryResult> {
408        self.query_message_by_key_internal(
409            cluster_name,
410            topic,
411            unique_key,
412            max_num,
413            begin_timestamp,
414            end_timestamp,
415            true,
416        )
417        .await
418    }
419
420    async fn query_message_by_key_internal(
421        &self,
422        cluster_name: Option<CheetahString>,
423        topic: CheetahString,
424        key: CheetahString,
425        max_num: i32,
426        begin_timestamp: i64,
427        end_timestamp: i64,
428        unique_key_flag: bool,
429    ) -> rocketmq_error::RocketMQResult<crate::base::query_result::QueryResult> {
430        let route_topic = cluster_name.unwrap_or_else(|| topic.clone());
431        let topic_route_data = self
432            .examine_topic_route_info(route_topic.clone())
433            .await?
434            .ok_or_else(|| {
435                rocketmq_error::RocketMQError::Internal(format!("Topic route not found for: {}", route_topic))
436            })?;
437
438        let mut message_list: Vec<MessageExt> = Vec::new();
439        let mut index_last_update_timestamp: u64 = 0;
440
441        let api_impl = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
442        let timeout = self.timeout_millis.as_millis() as u64;
443
444        for broker_data in &topic_route_data.broker_datas {
445            let broker_addr = match broker_data.select_broker_addr() {
446                Some(addr) => addr,
447                None => continue,
448            };
449
450            let request_header =
451                rocketmq_remoting::protocol::header::query_message_request_header::QueryMessageRequestHeader {
452                    topic: topic.clone(),
453                    key: key.clone(),
454                    max_num,
455                    begin_timestamp,
456                    end_timestamp,
457                    topic_request_header: None,
458                };
459
460            match MQClientAPIImpl::query_message(&api_impl, &broker_addr, request_header, unique_key_flag, timeout)
461                .await
462            {
463                Ok(Some((response_header, body))) => {
464                    if let Some(mut body_bytes) = body {
465                        let msgs = message_decoder::decodes_batch(&mut body_bytes, true, true);
466                        message_list.extend(msgs);
467                    }
468                    if response_header.index_last_update_timestamp as u64 > index_last_update_timestamp {
469                        index_last_update_timestamp = response_header.index_last_update_timestamp as u64;
470                    }
471                }
472                Ok(None) => {
473                    // No messages found on this broker, continue
474                }
475                Err(e) => {
476                    tracing::warn!("Failed to query message by key from broker {}: {}", broker_addr, e);
477                }
478            }
479        }
480
481        Ok(crate::base::query_result::QueryResult::new(
482            index_last_update_timestamp,
483            message_list,
484        ))
485    }
486}
487
488#[allow(unused_variables)]
489#[allow(unused_mut)]
490impl MQAdminExt for DefaultMQAdminExtImpl {
491    async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
492        match self.service_state {
493            ServiceState::CreateJust => {
494                self.service_state = ServiceState::StartFailed;
495                self.client_config.change_instance_name_to_pid();
496                if "{}".eq(&self.client_config.socks_proxy_config) {
497                    self.client_config.socks_proxy_config =
498                        env::var(SOCKS_PROXY_JSON).unwrap_or_else(|_| "{}".to_string()).into();
499                }
500                self.client_instance = Some(
501                    MQClientManager::get_instance()
502                        .get_or_create_mq_client_instance(self.client_config.as_ref().clone(), self.rpc_hook.clone()),
503                );
504
505                let group = &self.admin_ext_group.clone();
506                let register_ok = self
507                    .client_instance
508                    .as_mut()
509                    .unwrap()
510                    .register_admin_ext(
511                        group,
512                        MQAdminExtInnerImpl {
513                            inner: self.inner.as_ref().unwrap().clone(),
514                        },
515                    )
516                    .await;
517                if !register_ok {
518                    self.service_state = ServiceState::StartFailed;
519                    return Err(rocketmq_error::RocketMQError::illegal_argument(format!(
520                        "The adminExt group[{}] has created already, specified another name please.{}",
521                        self.admin_ext_group,
522                        FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL)
523                    )));
524                }
525                let arc_mut = self.client_instance.clone().unwrap();
526                self.client_instance.as_mut().unwrap().start(arc_mut).await?;
527                self.service_state = ServiceState::Running;
528                info!("the adminExt [{}] start OK", self.admin_ext_group);
529                Ok(())
530            }
531            ServiceState::Running | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
532                unimplemented!()
533            }
534        }
535    }
536
537    async fn shutdown(&mut self) {
538        match self.service_state {
539            ServiceState::CreateJust | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
540                // do nothing
541            }
542            ServiceState::Running => {
543                let instance = self.client_instance.as_mut().unwrap();
544                instance.unregister_admin_ext(&self.admin_ext_group).await;
545                instance.shutdown().await;
546                self.service_state = ServiceState::ShutdownAlready;
547            }
548        }
549    }
550
551    async fn add_broker_to_container(
552        &self,
553        broker_container_addr: CheetahString,
554        broker_config: CheetahString,
555    ) -> rocketmq_error::RocketMQResult<()> {
556        todo!()
557    }
558
559    async fn remove_broker_from_container(
560        &self,
561        broker_container_addr: CheetahString,
562        cluster_name: CheetahString,
563        broker_name: CheetahString,
564        broker_id: u64,
565    ) -> rocketmq_error::RocketMQResult<()> {
566        todo!()
567    }
568
569    async fn update_broker_config(
570        &self,
571        broker_addr: CheetahString,
572        properties: HashMap<CheetahString, CheetahString>,
573    ) -> rocketmq_error::RocketMQResult<()> {
574        let validator_input = properties
575            .iter()
576            .map(|(key, value)| (key.to_string(), value.to_string()))
577            .collect::<HashMap<String, String>>();
578        Validators::check_broker_config(&validator_input)?;
579
580        if let Some(ref mq_client_instance) = self.client_instance {
581            mq_client_instance
582                .get_mq_client_api_impl()
583                .update_broker_config(&broker_addr, properties, self.timeout_millis.as_millis() as u64)
584                .await
585        } else {
586            Err(rocketmq_error::RocketMQError::ClientNotStarted)
587        }
588    }
589
590    async fn get_broker_config(
591        &self,
592        broker_addr: CheetahString,
593    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, CheetahString>> {
594        if let Some(ref mq_client_instance) = self.client_instance {
595            mq_client_instance
596                .get_mq_client_api_impl()
597                .get_broker_config(&broker_addr, self.timeout_millis.as_millis() as u64)
598                .await
599        } else {
600            Err(rocketmq_error::RocketMQError::ClientNotStarted)
601        }
602    }
603
604    async fn create_and_update_topic_config(
605        &self,
606        addr: CheetahString,
607        config: TopicConfig,
608    ) -> rocketmq_error::RocketMQResult<()> {
609        let topic = config
610            .topic_name
611            .clone()
612            .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("Topic name is required".into()))?;
613        let attributes = encode_topic_attributes(&config.attributes);
614        let request_header = CreateTopicRequestHeader {
615            topic,
616            default_topic: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
617            read_queue_nums: config.read_queue_nums as i32,
618            write_queue_nums: config.write_queue_nums as i32,
619            perm: config.perm as i32,
620            topic_filter_type: config.topic_filter_type.to_string().into(),
621            topic_sys_flag: Some(config.topic_sys_flag as i32),
622            order: config.order,
623            attributes,
624            force: Some(false),
625            topic_request_header: None,
626        };
627
628        self.client_instance
629            .as_ref()
630            .unwrap()
631            .get_mq_client_api_impl()
632            .update_or_create_topic(&addr, request_header, self.timeout_millis.as_millis() as u64)
633            .await
634    }
635
636    async fn create_and_update_topic_config_list(
637        &self,
638        addr: CheetahString,
639        topic_config_list: Vec<TopicConfig>,
640    ) -> rocketmq_error::RocketMQResult<()> {
641        for config in topic_config_list {
642            self.create_and_update_topic_config(addr.clone(), config).await?;
643        }
644        Ok(())
645    }
646
647    async fn create_and_update_plain_access_config(
648        &self,
649        addr: CheetahString,
650        config: PlainAccessConfig,
651    ) -> rocketmq_error::RocketMQResult<()> {
652        todo!()
653    }
654
655    async fn delete_plain_access_config(
656        &self,
657        addr: CheetahString,
658        access_key: CheetahString,
659    ) -> rocketmq_error::RocketMQResult<()> {
660        todo!()
661    }
662
663    async fn update_global_white_addr_config(
664        &self,
665        addr: CheetahString,
666        global_white_addrs: CheetahString,
667        acl_file_full_path: Option<CheetahString>,
668    ) -> rocketmq_error::RocketMQResult<()> {
669        todo!()
670    }
671
672    async fn examine_broker_cluster_acl_version_info(
673        &self,
674        addr: CheetahString,
675    ) -> rocketmq_error::RocketMQResult<CheetahString> {
676        todo!()
677    }
678
679    async fn create_and_update_subscription_group_config(
680        &self,
681        addr: CheetahString,
682        config: SubscriptionGroupConfig,
683    ) -> rocketmq_error::RocketMQResult<()> {
684        self.client_instance
685            .as_ref()
686            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
687            .get_mq_client_api_impl()
688            .create_subscription_group(&addr, &config, self.timeout_millis.as_millis() as u64)
689            .await
690    }
691
692    async fn create_and_update_subscription_group_config_list(
693        &self,
694        broker_addr: CheetahString,
695        configs: Vec<SubscriptionGroupConfig>,
696    ) -> rocketmq_error::RocketMQResult<()> {
697        for config in configs {
698            self.create_and_update_subscription_group_config(broker_addr.clone(), config)
699                .await?;
700        }
701        Ok(())
702    }
703
704    async fn examine_subscription_group_config(
705        &self,
706        addr: CheetahString,
707        group: CheetahString,
708    ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
709        self.client_instance
710            .as_ref()
711            .unwrap()
712            .get_mq_client_api_impl()
713            .get_subscription_group_config(&addr, group, self.timeout_millis.as_millis() as u64)
714            .await
715    }
716
717    async fn examine_topic_stats(
718        &self,
719        topic: CheetahString,
720        broker_addr: Option<CheetahString>,
721    ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
722        let timeout = self.timeout_millis.as_millis() as u64;
723        let request_header = GetTopicStatsInfoRequestHeader {
724            topic: topic.clone(),
725            topic_request_header: None,
726        };
727        if let Some(addr) = broker_addr {
728            return self
729                .client_instance
730                .as_ref()
731                .unwrap()
732                .get_mq_client_api_impl()
733                .get_topic_stats_info(&addr, request_header, timeout)
734                .await;
735        }
736
737        let topic_route = self.examine_topic_route_info(topic).await?;
738        let mut result = TopicStatsTable::new();
739        if let Some(route_data) = topic_route {
740            for broker_data in &route_data.broker_datas {
741                if let Some(master_addr) = broker_data.broker_addrs().get(&mix_all::MASTER_ID) {
742                    let stats = self
743                        .client_instance
744                        .as_ref()
745                        .unwrap()
746                        .get_mq_client_api_impl()
747                        .get_topic_stats_info(master_addr, request_header.clone(), timeout)
748                        .await?;
749                    result.get_offset_table_mut().extend(stats.into_offset_table());
750                }
751            }
752        }
753
754        Ok(result)
755    }
756
757    async fn examine_topic_stats_concurrent(&self, topic: CheetahString) -> AdminToolResult<TopicStatsTable> {
758        match self.examine_topic_stats(topic, None).await {
759            Ok(stats) => AdminToolResult::success(stats),
760            Err(error) => AdminToolResult::failure(
761                crate::common::admin_tools_result_code_enum::AdminToolsResultCodeEnum::RemotingError,
762                error.to_string(),
763            ),
764        }
765    }
766
767    async fn fetch_all_topic_list(&self) -> rocketmq_error::RocketMQResult<TopicList> {
768        self.client_instance
769            .as_ref()
770            .unwrap()
771            .get_mq_client_api_impl()
772            .get_all_topic_list_from_name_server(self.timeout_millis.as_millis() as u64)
773            .await
774    }
775
776    async fn fetch_topics_by_cluster(&self, cluster_name: CheetahString) -> rocketmq_error::RocketMQResult<TopicList> {
777        todo!()
778    }
779
780    async fn fetch_broker_runtime_stats(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<KVTable> {
781        self.client_instance
782            .as_ref()
783            .unwrap()
784            .get_mq_client_api_impl()
785            .get_broker_runtime_info(&broker_addr, self.timeout_millis.as_millis() as u64)
786            .await
787    }
788
789    async fn examine_consume_stats(
790        &self,
791        consumer_group: CheetahString,
792        topic: Option<CheetahString>,
793        cluster_name: Option<CheetahString>,
794        broker_addr: Option<CheetahString>,
795        timeout_millis: Option<u64>,
796    ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
797        let timeout = timeout_millis.unwrap_or(self.timeout_millis.as_millis() as u64);
798        let topic_str = topic.clone().unwrap_or_default();
799
800        if let Some(addr) = broker_addr {
801            let request_header = GetConsumeStatsRequestHeader {
802                consumer_group,
803                topic: topic_str,
804                topic_request_header: None,
805            };
806            return self
807                .client_instance
808                .as_ref()
809                .unwrap()
810                .get_mq_client_api_impl()
811                .get_consume_stats(&addr, request_header, timeout)
812                .await;
813        }
814
815        let retry_topic: CheetahString = rocketmq_common::common::mix_all::get_retry_topic(&consumer_group).into();
816        let topic_route = self
817            .client_instance
818            .as_ref()
819            .unwrap()
820            .mq_client_api_impl
821            .as_ref()
822            .unwrap()
823            .get_topic_route_info_from_name_server(&retry_topic, timeout)
824            .await?;
825
826        let mut result = ConsumeStats::new();
827
828        if let Some(route_data) = topic_route {
829            for bd in &route_data.broker_datas {
830                if let Some(master_addr) = bd.broker_addrs().get(&rocketmq_common::common::mix_all::MASTER_ID) {
831                    let request_header = GetConsumeStatsRequestHeader {
832                        consumer_group: consumer_group.clone(),
833                        topic: topic_str.clone(),
834                        topic_request_header: None,
835                    };
836                    let cs = self
837                        .client_instance
838                        .as_ref()
839                        .unwrap()
840                        .get_mq_client_api_impl()
841                        .get_consume_stats(master_addr, request_header, timeout)
842                        .await?;
843
844                    result.get_offset_table_mut().extend(cs.offset_table);
845                    let new_tps = result.get_consume_tps() + cs.consume_tps;
846                    result.set_consume_tps(new_tps);
847                }
848            }
849        }
850
851        Ok(result)
852    }
853
854    async fn check_rocksdb_cq_write_progress(
855        &self,
856        broker_addr: CheetahString,
857        topic: CheetahString,
858        check_store_time: i64,
859    ) -> rocketmq_error::RocketMQResult<CheckRocksdbCqWriteResult> {
860        self.client_instance
861            .as_ref()
862            .unwrap()
863            .get_mq_client_api_impl()
864            .check_rocksdb_cq_write_progress(
865                &broker_addr,
866                topic,
867                check_store_time,
868                self.timeout_millis.as_millis() as u64,
869            )
870            .await
871    }
872
873    async fn examine_broker_cluster_info(&self) -> rocketmq_error::RocketMQResult<ClusterInfo> {
874        self.client_instance
875            .as_ref()
876            .unwrap()
877            .get_mq_client_api_impl()
878            .get_broker_cluster_info(self.timeout_millis.as_millis() as u64)
879            .await
880    }
881
882    async fn examine_topic_route_info(
883        &self,
884        topic: CheetahString,
885    ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
886        self.client_instance
887            .as_ref()
888            .unwrap()
889            .mq_client_api_impl
890            .as_ref()
891            .unwrap()
892            .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
893            .await
894    }
895
896    async fn examine_consumer_connection_info(
897        &self,
898        consumer_group: CheetahString,
899        broker_addr: Option<CheetahString>,
900    ) -> rocketmq_error::RocketMQResult<ConsumerConnection> {
901        let mut result = ConsumerConnection::new();
902        let timeout = self.timeout_millis.as_millis() as u64;
903
904        let selected_addr = if let Some(broker_addr) = broker_addr {
905            Some(broker_addr)
906        } else {
907            let topic = CheetahString::from_string(mix_all::get_retry_topic(consumer_group.as_str()));
908            let topic_route_data = self
909                .client_instance
910                .as_ref()
911                .unwrap()
912                .get_mq_client_api_impl()
913                .get_topic_route_info_from_name_server(&topic, timeout)
914                .await?;
915
916            topic_route_data.and_then(|topic_route_data| {
917                topic_route_data
918                    .broker_datas
919                    .choose(&mut rand::rng())
920                    .and_then(|broker_data| broker_data.select_broker_addr())
921            })
922        };
923
924        if let Some(broker_addr) = selected_addr {
925            result = self
926                .client_instance
927                .as_ref()
928                .unwrap()
929                .get_mq_client_api_impl()
930                .get_consumer_connection_list(broker_addr.as_str(), consumer_group.clone(), timeout)
931                .await?;
932        }
933
934        if result.get_connection_set().is_empty() {
935            return Err(mq_client_err!(
936                rocketmq_remoting::code::response_code::ResponseCode::ConsumerNotOnline,
937                "Not found the consumer group connection"
938            ));
939        }
940
941        Ok(result)
942    }
943
944    async fn examine_producer_connection_info(
945        &self,
946        producer_group: CheetahString,
947        topic: CheetahString,
948    ) -> rocketmq_error::RocketMQResult<ProducerConnection> {
949        let mut result = ProducerConnection::new();
950        let timeout = self.timeout_millis.as_millis() as u64;
951
952        if let Some(topic_route_data) = self.examine_topic_route_info(topic).await? {
953            let brokers = &topic_route_data.broker_datas;
954            let selected_addr = brokers
955                .choose(&mut rand::rng())
956                .and_then(|broker_data| broker_data.select_broker_addr());
957            if let Some(addr) = selected_addr {
958                result = self
959                    .client_instance
960                    .as_ref()
961                    .unwrap()
962                    .get_mq_client_api_impl()
963                    .get_producer_connection_list(addr.as_str(), producer_group.clone(), timeout)
964                    .await?;
965            }
966        }
967
968        if result.connection_set().is_empty() {
969            return Err(mq_client_err!("Not found the producer group connection"));
970        }
971
972        Ok(result)
973    }
974
975    async fn get_all_producer_info(
976        &self,
977        broker_addr: CheetahString,
978    ) -> rocketmq_error::RocketMQResult<ProducerTableInfo> {
979        self.client_instance
980            .as_ref()
981            .unwrap()
982            .get_mq_client_api_impl()
983            .get_all_producer_info(broker_addr.as_str(), self.timeout_millis.as_millis() as u64)
984            .await
985    }
986
987    async fn get_name_server_address_list(&self) -> Vec<CheetahString> {
988        self.client_instance
989            .as_ref()
990            .unwrap()
991            .get_mq_client_api_impl()
992            .get_name_server_address_list()
993            .to_vec()
994    }
995
996    async fn wipe_write_perm_of_broker(
997        &self,
998        namesrv_addr: CheetahString,
999        broker_name: CheetahString,
1000    ) -> rocketmq_error::RocketMQResult<i32> {
1001        self.client_instance
1002            .as_ref()
1003            .unwrap()
1004            .get_mq_client_api_impl()
1005            .wipe_write_perm_of_broker(namesrv_addr, broker_name, self.timeout_millis.as_millis() as u64)
1006            .await
1007    }
1008
1009    async fn add_write_perm_of_broker(
1010        &self,
1011        namesrv_addr: CheetahString,
1012        broker_name: CheetahString,
1013    ) -> rocketmq_error::RocketMQResult<i32> {
1014        self.client_instance
1015            .as_ref()
1016            .unwrap()
1017            .get_mq_client_api_impl()
1018            .add_write_perm_of_broker(namesrv_addr, broker_name, self.timeout_millis.as_millis() as u64)
1019            .await
1020    }
1021
1022    async fn put_kv_config(&self, namespace: CheetahString, key: CheetahString, value: CheetahString) {
1023        todo!()
1024    }
1025
1026    async fn get_kv_config(
1027        &self,
1028        namespace: CheetahString,
1029        key: CheetahString,
1030    ) -> rocketmq_error::RocketMQResult<CheetahString> {
1031        Ok(self
1032            .client_instance
1033            .as_ref()
1034            .unwrap()
1035            .get_mq_client_api_impl()
1036            .get_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
1037            .await?
1038            .unwrap_or_default())
1039    }
1040
1041    async fn get_kv_list_by_namespace(&self, namespace: CheetahString) -> rocketmq_error::RocketMQResult<KVTable> {
1042        todo!()
1043    }
1044
1045    async fn delete_topic(
1046        &self,
1047        topic_name: CheetahString,
1048        cluster_name: CheetahString,
1049    ) -> rocketmq_error::RocketMQResult<()> {
1050        let cluster_info = self.examine_broker_cluster_info().await?;
1051        let mut broker_addrs = HashSet::new();
1052        if let Some(cluster_addr_table) = cluster_info.cluster_addr_table.as_ref() {
1053            if let Some(broker_names) = cluster_addr_table.get(&cluster_name) {
1054                if let Some(broker_addr_table) = cluster_info.broker_addr_table.as_ref() {
1055                    for broker_name in broker_names {
1056                        if let Some(broker_data) = broker_addr_table.get(broker_name) {
1057                            broker_addrs.extend(broker_data.broker_addrs().values().cloned());
1058                        }
1059                    }
1060                }
1061            }
1062        }
1063        self.delete_topic_in_broker(broker_addrs, topic_name.clone()).await?;
1064
1065        let namesrv_addrs: HashSet<CheetahString> = self.get_name_server_address_list().await.into_iter().collect();
1066        self.delete_topic_in_name_server(namesrv_addrs, Some(cluster_name), topic_name)
1067            .await
1068    }
1069
1070    async fn delete_topic_in_broker(
1071        &self,
1072        addrs: HashSet<CheetahString>,
1073        topic: CheetahString,
1074    ) -> rocketmq_error::RocketMQResult<()> {
1075        let request_header = DeleteTopicRequestHeader {
1076            topic: topic.clone(),
1077            topic_request_header: None,
1078        };
1079        let api = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
1080        let timeout = self.timeout_millis.as_millis() as u64;
1081        for addr in addrs {
1082            api.delete_topic_in_broker(
1083                &addr,
1084                DeleteTopicRequestHeader {
1085                    topic: request_header.topic.clone(),
1086                    topic_request_header: None,
1087                },
1088                timeout,
1089            )
1090            .await?;
1091        }
1092        Ok(())
1093    }
1094
1095    async fn delete_topic_in_name_server(
1096        &self,
1097        addrs: HashSet<CheetahString>,
1098        cluster_name: Option<CheetahString>,
1099        topic: CheetahString,
1100    ) -> rocketmq_error::RocketMQResult<()> {
1101        let request_header = DeleteTopicFromNamesrvRequestHeader::new(topic, cluster_name);
1102        let api = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
1103        let timeout = self.timeout_millis.as_millis() as u64;
1104        for addr in addrs {
1105            api.delete_topic_in_nameserver(&addr, request_header.clone(), timeout)
1106                .await?;
1107        }
1108        Ok(())
1109    }
1110
1111    async fn delete_subscription_group(
1112        &self,
1113        addr: CheetahString,
1114        group_name: CheetahString,
1115        remove_offset: Option<bool>,
1116    ) -> rocketmq_error::RocketMQResult<()> {
1117        self.client_instance
1118            .as_ref()
1119            .unwrap()
1120            .get_mq_client_api_impl()
1121            .delete_subscription_group(
1122                &addr,
1123                group_name,
1124                remove_offset.unwrap_or(false),
1125                self.timeout_millis.as_millis() as u64,
1126            )
1127            .await
1128    }
1129
1130    async fn create_and_update_kv_config(
1131        &self,
1132        namespace: CheetahString,
1133        key: CheetahString,
1134        value: CheetahString,
1135    ) -> rocketmq_error::RocketMQResult<()> {
1136        self.client_instance
1137            .as_ref()
1138            .unwrap()
1139            .get_mq_client_api_impl()
1140            .put_kvconfig_value(namespace, key, value, self.timeout_millis.as_millis() as u64)
1141            .await
1142    }
1143
1144    async fn delete_kv_config(
1145        &self,
1146        namespace: CheetahString,
1147        key: CheetahString,
1148    ) -> rocketmq_error::RocketMQResult<()> {
1149        self.client_instance
1150            .as_ref()
1151            .unwrap()
1152            .get_mq_client_api_impl()
1153            .delete_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
1154            .await
1155    }
1156
1157    async fn reset_offset_by_timestamp(
1158        &self,
1159        cluster_name: Option<CheetahString>,
1160        topic: CheetahString,
1161        group: CheetahString,
1162        timestamp: u64,
1163        is_force: bool,
1164    ) -> rocketmq_error::RocketMQResult<HashMap<MessageQueue, u64>> {
1165        let topic_route = self.examine_topic_route_info(topic.clone()).await?;
1166        let mut offset_table = HashMap::new();
1167        let timeout = self.timeout_millis.as_millis() as u64;
1168
1169        if let Some(route_data) = topic_route {
1170            for broker_data in &route_data.broker_datas {
1171                if let Some(expected_cluster) = cluster_name.as_ref() {
1172                    if broker_data.cluster() != expected_cluster {
1173                        continue;
1174                    }
1175                }
1176                if let Some(master_addr) = broker_data.broker_addrs().get(&mix_all::MASTER_ID) {
1177                    let request_header = ResetOffsetRequestHeader {
1178                        topic: topic.clone(),
1179                        group: group.clone(),
1180                        queue_id: -1,
1181                        offset: None,
1182                        timestamp: timestamp as i64,
1183                        is_force,
1184                        topic_request_header: None,
1185                    };
1186                    let offsets = self
1187                        .client_instance
1188                        .as_ref()
1189                        .unwrap()
1190                        .get_mq_client_api_impl()
1191                        .invoke_broker_to_reset_offset(master_addr, request_header, timeout)
1192                        .await?;
1193                    offset_table.extend(offsets.into_iter().map(|(mq, offset)| (mq, offset as u64)));
1194                }
1195            }
1196        }
1197
1198        Ok(offset_table)
1199    }
1200
1201    async fn reset_offset_new(
1202        &self,
1203        consumer_group: CheetahString,
1204        topic: CheetahString,
1205        timestamp: u64,
1206    ) -> rocketmq_error::RocketMQResult<()> {
1207        todo!()
1208    }
1209
1210    async fn get_consume_status(
1211        &self,
1212        topic: CheetahString,
1213        group: CheetahString,
1214        client_addr: CheetahString,
1215    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<MessageQueue, u64>>> {
1216        let topic_route_data = self.examine_topic_route_info(topic.clone()).await?;
1217        if let Some(route_data) = topic_route_data {
1218            if !route_data.broker_datas.is_empty() {
1219                if let Some(addr) = route_data.broker_datas[0].select_broker_addr() {
1220                    let result = self
1221                        .client_instance
1222                        .as_ref()
1223                        .unwrap()
1224                        .get_mq_client_api_impl()
1225                        .invoke_broker_to_get_consumer_status(
1226                            addr.as_str(),
1227                            topic,
1228                            group,
1229                            client_addr,
1230                            self.timeout_millis.as_millis() as u64,
1231                        )
1232                        .await?;
1233                    let converted: HashMap<CheetahString, HashMap<MessageQueue, u64>> = result
1234                        .into_iter()
1235                        .map(|(k, v)| {
1236                            let inner: HashMap<MessageQueue, u64> =
1237                                v.into_iter().map(|(mq, off)| (mq, off as u64)).collect();
1238                            (k, inner)
1239                        })
1240                        .collect();
1241                    return Ok(converted);
1242                }
1243            }
1244        }
1245        Ok(HashMap::new())
1246    }
1247
1248    async fn create_or_update_order_conf(
1249        &self,
1250        key: CheetahString,
1251        value: CheetahString,
1252        is_cluster: bool,
1253    ) -> rocketmq_error::RocketMQResult<()> {
1254        if is_cluster {
1255            return self
1256                .client_instance
1257                .as_ref()
1258                .unwrap()
1259                .get_mq_client_api_impl()
1260                .put_kvconfig_value(
1261                    CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
1262                    key,
1263                    value,
1264                    self.timeout_millis.as_millis() as u64,
1265                )
1266                .await;
1267        }
1268
1269        let existing = self
1270            .client_instance
1271            .as_ref()
1272            .unwrap()
1273            .get_mq_client_api_impl()
1274            .get_kvconfig_value(
1275                CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
1276                key.clone(),
1277                self.timeout_millis.as_millis() as u64,
1278            )
1279            .await?
1280            .unwrap_or_default();
1281
1282        let merged_order_conf = merge_order_conf_entries(existing.as_str(), value.as_str());
1283
1284        self.client_instance
1285            .as_ref()
1286            .unwrap()
1287            .get_mq_client_api_impl()
1288            .put_kvconfig_value(
1289                CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
1290                key,
1291                merged_order_conf.into(),
1292                self.timeout_millis.as_millis() as u64,
1293            )
1294            .await
1295    }
1296
1297    async fn query_topic_consume_by_who(&self, topic: CheetahString) -> rocketmq_error::RocketMQResult<GroupList> {
1298        let topic_route = self
1299            .client_instance
1300            .as_ref()
1301            .unwrap()
1302            .mq_client_api_impl
1303            .as_ref()
1304            .unwrap()
1305            .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
1306            .await?;
1307
1308        if let Some(route_data) = topic_route {
1309            for bd in &route_data.broker_datas {
1310                if let Some(master_addr) = bd.broker_addrs().get(&rocketmq_common::common::mix_all::MASTER_ID) {
1311                    let request_header = QueryTopicConsumeByWhoRequestHeader {
1312                        topic: topic.clone(),
1313                        topic_request_header: None,
1314                    };
1315                    return self
1316                        .client_instance
1317                        .as_ref()
1318                        .unwrap()
1319                        .get_mq_client_api_impl()
1320                        .query_topic_consume_by_who(master_addr, request_header, self.timeout_millis.as_millis() as u64)
1321                        .await;
1322                }
1323            }
1324        }
1325
1326        Ok(GroupList::default())
1327    }
1328
1329    async fn query_topics_by_consumer(&self, group: CheetahString) -> rocketmq_error::RocketMQResult<TopicList> {
1330        todo!()
1331    }
1332
1333    async fn query_topics_by_consumer_concurrent(&self, group: CheetahString) -> AdminToolResult<TopicList> {
1334        todo!()
1335    }
1336
1337    async fn query_subscription(
1338        &self,
1339        group: CheetahString,
1340        topic: CheetahString,
1341    ) -> rocketmq_error::RocketMQResult<SubscriptionData> {
1342        todo!()
1343    }
1344
1345    async fn clean_expired_consumer_queue(
1346        &self,
1347        cluster: Option<CheetahString>,
1348        addr: Option<CheetahString>,
1349    ) -> rocketmq_error::RocketMQResult<bool> {
1350        todo!()
1351    }
1352
1353    async fn delete_expired_commit_log(
1354        &self,
1355        cluster: Option<CheetahString>,
1356        addr: Option<CheetahString>,
1357    ) -> rocketmq_error::RocketMQResult<bool> {
1358        todo!()
1359    }
1360
1361    async fn clean_unused_topic(
1362        &self,
1363        cluster: Option<CheetahString>,
1364        addr: Option<CheetahString>,
1365    ) -> rocketmq_error::RocketMQResult<bool> {
1366        todo!()
1367    }
1368
1369    async fn get_consumer_running_info(
1370        &self,
1371        consumer_group: CheetahString,
1372        client_id: CheetahString,
1373        jstack: bool,
1374        _metrics: Option<bool>,
1375    ) -> rocketmq_error::RocketMQResult<ConsumerRunningInfo> {
1376        let broker_addr = self
1377            .examine_consumer_connection_info(consumer_group.clone(), None)
1378            .await?
1379            .get_connection_set()
1380            .iter()
1381            .find(|connection| connection.get_client_id() == client_id)
1382            .map(|connection| connection.get_client_addr().clone())
1383            .ok_or_else(|| {
1384                rocketmq_error::RocketMQError::IllegalArgument(format!(
1385                    "Client `{}` was not found in consumer group `{}`",
1386                    client_id, consumer_group
1387                ))
1388            })?;
1389
1390        self.client_instance
1391            .as_ref()
1392            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
1393            .get_mq_client_api_impl()
1394            .get_consumer_running_info(
1395                &broker_addr,
1396                consumer_group,
1397                client_id,
1398                jstack,
1399                self.timeout_millis.as_millis() as u64,
1400            )
1401            .await
1402    }
1403
1404    async fn consume_message_directly(
1405        &self,
1406        consumer_group: CheetahString,
1407        client_id: CheetahString,
1408        topic: CheetahString,
1409        msg_id: CheetahString,
1410    ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
1411        let consumer_connection = self
1412            .examine_consumer_connection_info(consumer_group.clone(), None)
1413            .await?;
1414        let (resolved_client_id, client_addr) =
1415            select_consumer_direct_connection(&consumer_group, &consumer_connection, Some(&client_id))?;
1416        let message = MQAdminExt::query_message(self, CheetahString::default(), topic.clone(), msg_id.clone()).await?;
1417        let request_header = ConsumeMessageDirectlyResultRequestHeader {
1418            consumer_group,
1419            client_id: Some(resolved_client_id),
1420            msg_id: Some(msg_id),
1421            broker_name: (!message.broker_name().is_empty()).then(|| message.broker_name.clone()),
1422            topic: Some(topic),
1423            topic_sys_flag: None,
1424            group_sys_flag: None,
1425            topic_request_header: None,
1426        };
1427
1428        self.client_instance
1429            .as_ref()
1430            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
1431            .get_mq_client_api_impl()
1432            .consume_message_directly(
1433                &client_addr,
1434                request_header,
1435                &message,
1436                self.timeout_millis.as_millis() as u64,
1437            )
1438            .await
1439    }
1440
1441    async fn consume_message_directly_ext(
1442        &self,
1443        _cluster_name: CheetahString,
1444        consumer_group: CheetahString,
1445        client_id: CheetahString,
1446        topic: CheetahString,
1447        msg_id: CheetahString,
1448    ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
1449        self.consume_message_directly(consumer_group, client_id, topic, msg_id)
1450            .await
1451    }
1452
1453    async fn clone_group_offset(
1454        &self,
1455        src_group: CheetahString,
1456        dest_group: CheetahString,
1457        topic: CheetahString,
1458        is_offline: bool,
1459    ) -> rocketmq_error::RocketMQResult<()> {
1460        todo!()
1461    }
1462
1463    async fn get_cluster_list(&self, topic: String) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
1464        todo!()
1465    }
1466
1467    async fn get_topic_cluster_list(&self, topic: String) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
1468        let cluster_info = self.examine_broker_cluster_info().await?;
1469        let topic_route_data = self.examine_topic_route_info(topic.into()).await?.unwrap();
1470        let broker_data = topic_route_data
1471            .broker_datas
1472            .first()
1473            .ok_or_else(|| mq_client_err!("Broker datas is empty"))?;
1474        let mut cluster_set = HashSet::new();
1475        let broker_name = broker_data.broker_name();
1476        if let Some(cluster_addr_table) = cluster_info.cluster_addr_table.as_ref() {
1477            cluster_set.extend(
1478                cluster_addr_table
1479                    .iter()
1480                    .filter(|(cluster_name, broker_names)| broker_names.contains(broker_name))
1481                    .map(|(cluster_name, broker_names)| cluster_name.clone()),
1482            );
1483        }
1484        Ok(cluster_set)
1485    }
1486
1487    async fn get_all_topic_config(
1488        &self,
1489        broker_addr: CheetahString,
1490        timeout_millis: u64,
1491    ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
1492        self.client_instance
1493            .as_ref()
1494            .unwrap()
1495            .get_mq_client_api_impl()
1496            .get_all_topic_config(&broker_addr, timeout_millis)
1497            .await
1498    }
1499
1500    async fn get_user_topic_config(
1501        &self,
1502        broker_addr: CheetahString,
1503        special_topic: bool,
1504        timeout_millis: u64,
1505    ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
1506        let mut topic_config_wrapper = self.get_all_topic_config(broker_addr, timeout_millis).await?;
1507
1508        if let Some(ref mut topic_table) = topic_config_wrapper.topic_config_table_mut() {
1509            topic_table.retain(|topic_name, topic_config| {
1510                if TopicValidator::is_system_topic(topic_name.as_str()) {
1511                    return false;
1512                }
1513                if !special_topic
1514                    && (topic_name.starts_with(RETRY_GROUP_TOPIC_PREFIX)
1515                        || topic_name.starts_with(DLQ_GROUP_TOPIC_PREFIX))
1516                {
1517                    return false;
1518                }
1519                if !PermName::is_valid(topic_config.perm) {
1520                    return false;
1521                }
1522                true
1523            });
1524        }
1525
1526        Ok(topic_config_wrapper)
1527    }
1528
1529    async fn update_consume_offset(
1530        &self,
1531        broker_addr: CheetahString,
1532        consume_group: CheetahString,
1533        mq: MessageQueue,
1534        offset: u64,
1535    ) -> rocketmq_error::RocketMQResult<()> {
1536        todo!()
1537    }
1538
1539    async fn update_name_server_config(
1540        &self,
1541        properties: HashMap<CheetahString, CheetahString>,
1542        name_servers: Option<Vec<CheetahString>>,
1543    ) -> rocketmq_error::RocketMQResult<()> {
1544        self.client_instance
1545            .as_ref()
1546            .unwrap()
1547            .get_mq_client_api_impl()
1548            .update_name_server_config(properties, name_servers, self.timeout_millis.as_millis() as u64)
1549            .await
1550    }
1551
1552    async fn get_name_server_config(
1553        &self,
1554        name_servers: Vec<CheetahString>,
1555    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>> {
1556        Ok(self
1557            .client_instance
1558            .as_ref()
1559            .unwrap()
1560            .mq_client_api_impl
1561            .as_ref()
1562            .unwrap()
1563            .get_name_server_config(Some(name_servers), self.timeout_millis)
1564            .await?
1565            .unwrap_or_default())
1566    }
1567
1568    async fn probe_name_server(&self, name_server: CheetahString) -> rocketmq_error::RocketMQResult<()> {
1569        self.client_instance
1570            .as_ref()
1571            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
1572            .get_mq_client_api_impl()
1573            .probe_name_server(&name_server, self.timeout_millis)
1574            .await
1575    }
1576
1577    async fn resume_check_half_message(
1578        &self,
1579        topic: CheetahString,
1580        msg_id: CheetahString,
1581    ) -> rocketmq_error::RocketMQResult<bool> {
1582        todo!()
1583    }
1584
1585    async fn set_message_request_mode(
1586        &self,
1587        broker_addr: CheetahString,
1588        topic: CheetahString,
1589        consumer_group: CheetahString,
1590        mode: MessageRequestMode,
1591        pop_work_group_size: i32,
1592        timeout_millis: u64,
1593    ) -> rocketmq_error::RocketMQResult<()> {
1594        let mut mq_client_api = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
1595        match mq_client_api
1596            .set_message_request_mode(
1597                &broker_addr,
1598                &topic,
1599                &consumer_group,
1600                mode,
1601                pop_work_group_size,
1602                timeout_millis,
1603            )
1604            .await
1605        {
1606            Ok(_) => Ok(()),
1607            Err(e) => Err(e),
1608        }
1609    }
1610
1611    async fn reset_offset_by_queue_id(
1612        &self,
1613        broker_addr: CheetahString,
1614        consumer_group: CheetahString,
1615        topic_name: CheetahString,
1616        queue_id: i32,
1617        reset_offset: u64,
1618    ) -> rocketmq_error::RocketMQResult<()> {
1619        todo!()
1620    }
1621
1622    async fn examine_topic_config(
1623        &self,
1624        addr: CheetahString,
1625        topic: CheetahString,
1626    ) -> rocketmq_error::RocketMQResult<TopicConfig> {
1627        let request_header = GetTopicConfigRequestHeader {
1628            topic,
1629            topic_request_header: None,
1630        };
1631        let mapping: TopicConfigAndQueueMapping = self
1632            .client_instance
1633            .as_ref()
1634            .unwrap()
1635            .get_mq_client_api_impl()
1636            .get_topic_config(&addr, request_header, self.timeout_millis.as_millis() as u64)
1637            .await?;
1638
1639        Ok(mapping.topic_config)
1640    }
1641
1642    async fn create_static_topic(
1643        &self,
1644        addr: CheetahString,
1645        default_topic: CheetahString,
1646        topic_config: TopicConfig,
1647        mapping_detail: TopicQueueMappingDetail,
1648        force: bool,
1649    ) -> rocketmq_error::RocketMQResult<()> {
1650        todo!()
1651    }
1652
1653    async fn get_controller_meta_data(
1654        &self,
1655        controller_addr: CheetahString,
1656    ) -> rocketmq_error::RocketMQResult<GetMetaDataResponseHeader> {
1657        if let Some(ref mq_client_instance) = self.client_instance {
1658            Ok(mq_client_instance
1659                .get_mq_client_api_impl()
1660                .get_controller_metadata(controller_addr, self.timeout_millis.as_millis() as u64)
1661                .await?)
1662        } else {
1663            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1664        }
1665    }
1666
1667    async fn reset_master_flush_offset(
1668        &self,
1669        broker_addr: CheetahString,
1670        master_flush_offset: u64,
1671    ) -> rocketmq_error::RocketMQResult<()> {
1672        if let Some(ref mq_client_instance) = self.client_instance {
1673            mq_client_instance
1674                .get_mq_client_api_impl()
1675                .reset_master_flush_offset(&broker_addr, master_flush_offset as i64)
1676                .await
1677        } else {
1678            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1679        }
1680    }
1681
1682    async fn get_controller_config(
1683        &self,
1684        controller_servers: Vec<CheetahString>,
1685    ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>> {
1686        if let Some(ref mq_client_instance) = self.client_instance {
1687            let mut result: HashMap<CheetahString, HashMap<CheetahString, CheetahString>> = HashMap::new();
1688            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1689            let timeout_millis = self.timeout_millis.as_millis() as u64;
1690
1691            for controller_addr in controller_servers {
1692                match mq_client_api
1693                    .get_controller_config(controller_addr.clone(), timeout_millis)
1694                    .await
1695                {
1696                    Ok(config) => {
1697                        result.insert(controller_addr, config);
1698                    }
1699                    Err(e) => {
1700                        eprintln!("Failed to get config from controller {}: {}", controller_addr, e);
1701                    }
1702                }
1703            }
1704
1705            Ok(result)
1706        } else {
1707            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1708        }
1709    }
1710
1711    async fn update_controller_config(
1712        &self,
1713        properties: HashMap<CheetahString, CheetahString>,
1714        controllers: Vec<CheetahString>,
1715    ) -> rocketmq_error::RocketMQResult<()> {
1716        todo!()
1717    }
1718
1719    async fn clean_controller_broker_data(
1720        &self,
1721        controller_addr: CheetahString,
1722        cluster_name: CheetahString,
1723        broker_name: CheetahString,
1724        broker_controller_ids_to_clean: Option<CheetahString>,
1725        is_clean_living_broker: bool,
1726    ) -> rocketmq_error::RocketMQResult<()> {
1727        todo!()
1728    }
1729
1730    async fn update_cold_data_flow_ctr_group_config(
1731        &self,
1732        broker_addr: CheetahString,
1733        properties: HashMap<CheetahString, CheetahString>,
1734    ) -> rocketmq_error::RocketMQResult<()> {
1735        self.client_instance
1736            .as_ref()
1737            .unwrap()
1738            .get_mq_client_api_impl()
1739            .update_cold_data_flow_ctr_group_config(broker_addr, properties, self.timeout_millis.as_millis() as u64)
1740            .await
1741    }
1742
1743    async fn remove_cold_data_flow_ctr_group_config(
1744        &self,
1745        broker_addr: CheetahString,
1746        consumer_group: CheetahString,
1747    ) -> rocketmq_error::RocketMQResult<()> {
1748        todo!()
1749    }
1750
1751    async fn get_cold_data_flow_ctr_info(
1752        &self,
1753        broker_addr: CheetahString,
1754    ) -> rocketmq_error::RocketMQResult<CheetahString> {
1755        todo!()
1756    }
1757
1758    async fn set_commit_log_read_ahead_mode(
1759        &self,
1760        broker_addr: CheetahString,
1761        mode: CheetahString,
1762    ) -> rocketmq_error::RocketMQResult<CheetahString> {
1763        todo!()
1764    }
1765
1766    async fn create_user(
1767        &self,
1768        broker_addr: CheetahString,
1769        username: CheetahString,
1770        password: CheetahString,
1771        user_type: CheetahString,
1772    ) -> rocketmq_error::RocketMQResult<()> {
1773        let user_info = UserInfo {
1774            username: Some(username),
1775            user_type: Some(user_type),
1776            password: Some(password),
1777            user_status: None,
1778        };
1779
1780        if let Some(ref mq_client_instance) = self.client_instance {
1781            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1782            let timeout_millis = self.timeout_millis.as_millis() as u64;
1783            mq_client_api
1784                .create_user(broker_addr, &user_info, timeout_millis)
1785                .await?;
1786            Ok(())
1787        } else {
1788            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1789        }
1790    }
1791
1792    async fn update_user(
1793        &self,
1794        broker_addr: CheetahString,
1795        username: CheetahString,
1796        password: CheetahString,
1797        user_type: CheetahString,
1798        user_status: CheetahString,
1799    ) -> rocketmq_error::RocketMQResult<()> {
1800        let mut user_info = UserInfo {
1801            username: Some(username),
1802            user_type: Some(user_type),
1803            password: Some(password),
1804            user_status: Some(user_status),
1805        };
1806
1807        if let Some(ref mq_client_instance) = self.client_instance {
1808            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1809            let timeout_millis = self.timeout_millis.as_millis() as u64;
1810            mq_client_api
1811                .update_user(broker_addr, &user_info, timeout_millis)
1812                .await?;
1813            Ok(())
1814        } else {
1815            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1816        }
1817    }
1818
1819    async fn delete_user(
1820        &self,
1821        broker_addr: CheetahString,
1822        username: CheetahString,
1823    ) -> rocketmq_error::RocketMQResult<()> {
1824        if let Some(ref mq_client_instance) = self.client_instance {
1825            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1826            let timeout_millis = self.timeout_millis.as_millis() as u64;
1827            mq_client_api.delete_user(broker_addr, username, timeout_millis).await?;
1828            Ok(())
1829        } else {
1830            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1831        }
1832    }
1833
1834    async fn create_acl(
1835        &self,
1836        broker_addr: CheetahString,
1837        subject: CheetahString,
1838        resources: Vec<CheetahString>,
1839        actions: Vec<CheetahString>,
1840        source_ips: Vec<CheetahString>,
1841        decision: CheetahString,
1842    ) -> rocketmq_error::RocketMQResult<()> {
1843        todo!()
1844    }
1845
1846    async fn update_acl(
1847        &self,
1848        broker_addr: CheetahString,
1849        subject: CheetahString,
1850        resources: Vec<CheetahString>,
1851        actions: Vec<CheetahString>,
1852        source_ips: Vec<CheetahString>,
1853        decision: CheetahString,
1854    ) -> rocketmq_error::RocketMQResult<()> {
1855        todo!()
1856    }
1857
1858    async fn delete_acl(
1859        &self,
1860        broker_addr: CheetahString,
1861        subject: CheetahString,
1862        resource: CheetahString,
1863    ) -> rocketmq_error::RocketMQResult<()> {
1864        if let Some(ref client_instance) = self.client_instance {
1865            let mq_client_api = client_instance.get_mq_client_api_impl();
1866            mq_client_api
1867                .delete_acl(broker_addr, subject, resource, self.timeout_millis.as_millis() as u64)
1868                .await
1869        } else {
1870            Err(rocketmq_error::RocketMQError::ClientNotStarted)
1871        }
1872    }
1873
1874    async fn create_lite_pull_topic(
1875        &self,
1876        _addr: CheetahString,
1877        _topic: CheetahString,
1878        _queue_num: i32,
1879        _topic_sys_flag: i32,
1880        _read_queue_nums: i32,
1881        _write_queue_nums: i32,
1882    ) -> rocketmq_error::RocketMQResult<()> {
1883        unimplemented!("create_lite_pull_topic not implemented yet")
1884    }
1885
1886    async fn update_lite_pull_topic(
1887        &self,
1888        _addr: CheetahString,
1889        _topic: CheetahString,
1890        _read_queue_nums: i32,
1891        _write_queue_nums: i32,
1892    ) -> rocketmq_error::RocketMQResult<()> {
1893        unimplemented!("update_lite_pull_topic not implemented yet")
1894    }
1895
1896    async fn get_lite_pull_topic(
1897        &self,
1898        _addr: CheetahString,
1899        _topic: CheetahString,
1900    ) -> rocketmq_error::RocketMQResult<TopicConfig> {
1901        unimplemented!("get_lite_pull_topic not implemented yet")
1902    }
1903
1904    async fn delete_lite_pull_topic(
1905        &self,
1906        _addr: CheetahString,
1907        _cluster_name: CheetahString,
1908        _topic: CheetahString,
1909    ) -> rocketmq_error::RocketMQResult<()> {
1910        unimplemented!("delete_lite_pull_topic not implemented yet")
1911    }
1912
1913    async fn query_lite_pull_topic_list(&self, _addr: CheetahString) -> rocketmq_error::RocketMQResult<TopicList> {
1914        unimplemented!("query_lite_pull_topic_list not implemented yet")
1915    }
1916
1917    async fn query_lite_pull_topic_by_cluster(
1918        &self,
1919        _cluster_name: CheetahString,
1920    ) -> rocketmq_error::RocketMQResult<TopicList> {
1921        unimplemented!("query_lite_pull_topic_by_cluster not implemented yet")
1922    }
1923
1924    async fn query_lite_pull_subscription_list(
1925        &self,
1926        _addr: CheetahString,
1927        _topic: CheetahString,
1928    ) -> rocketmq_error::RocketMQResult<GroupList> {
1929        unimplemented!("query_lite_pull_subscription_list not implemented yet")
1930    }
1931
1932    async fn update_lite_pull_consumer_offset(
1933        &self,
1934        _addr: CheetahString,
1935        _topic: CheetahString,
1936        _group: CheetahString,
1937        _queue_id: i32,
1938        _offset: u64,
1939    ) -> rocketmq_error::RocketMQResult<()> {
1940        unimplemented!("update_lite_pull_consumer_offset not implemented yet")
1941    }
1942
1943    async fn examine_consume_stats_with_queue(
1944        &self,
1945        _consumer_group: CheetahString,
1946        _topic: Option<CheetahString>,
1947        _queue_id: Option<i32>,
1948    ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
1949        unimplemented!("examine_consume_stats_with_queue not implemented yet")
1950    }
1951
1952    async fn examine_consume_stats_concurrent(
1953        &self,
1954        _consumer_group: CheetahString,
1955        _topic: Option<CheetahString>,
1956    ) -> AdminToolResult<ConsumeStats> {
1957        unimplemented!("examine_consume_stats_concurrent not implemented yet")
1958    }
1959
1960    async fn examine_consume_stats_concurrent_with_cluster(
1961        &self,
1962        _consumer_group: CheetahString,
1963        _topic: Option<CheetahString>,
1964        _cluster_name: Option<CheetahString>,
1965    ) -> AdminToolResult<ConsumeStats> {
1966        unimplemented!("examine_consume_stats_concurrent_with_cluster not implemented yet")
1967    }
1968
1969    async fn export_rocksdb_consumer_offset_to_json(
1970        &self,
1971        _broker_addr: CheetahString,
1972        _file_path: CheetahString,
1973    ) -> rocketmq_error::RocketMQResult<()> {
1974        unimplemented!("export_rocksdb_consumer_offset_to_json not implemented yet")
1975    }
1976
1977    async fn export_rocksdb_consumer_offset_from_memory(
1978        &self,
1979        _broker_addr: CheetahString,
1980    ) -> rocketmq_error::RocketMQResult<CheetahString> {
1981        unimplemented!("export_rocksdb_consumer_offset_from_memory not implemented yet")
1982    }
1983
1984    async fn sync_broker_member_group(
1985        &self,
1986        _controller_addr: CheetahString,
1987        _cluster_name: CheetahString,
1988        _broker_name: CheetahString,
1989    ) -> rocketmq_error::RocketMQResult<()> {
1990        unimplemented!("sync_broker_member_group not implemented yet")
1991    }
1992
1993    async fn get_topic_config_by_topic_name(
1994        &self,
1995        _broker_addr: CheetahString,
1996        _topic_name: CheetahString,
1997    ) -> rocketmq_error::RocketMQResult<TopicConfig> {
1998        unimplemented!("get_topic_config_by_topic_name not implemented yet")
1999    }
2000
2001    async fn notify_min_broker_id_changed(
2002        &self,
2003        _cluster_name: CheetahString,
2004        _broker_name: CheetahString,
2005        _min_broker_id: u64,
2006        _min_broker_addr: CheetahString,
2007        _offline_broker_addr: Option<CheetahString>,
2008        _ha_broker_addr: Option<CheetahString>,
2009    ) -> rocketmq_error::RocketMQResult<()> {
2010        unimplemented!("notify_min_broker_id_changed not implemented yet")
2011    }
2012
2013    async fn get_topic_stats_info(
2014        &self,
2015        _broker_addr: CheetahString,
2016        _topic: CheetahString,
2017    ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
2018        unimplemented!("get_topic_stats_info not implemented yet")
2019    }
2020
2021    async fn query_broker_has_topic(
2022        &self,
2023        _broker_addr: CheetahString,
2024        _topic: CheetahString,
2025    ) -> rocketmq_error::RocketMQResult<bool> {
2026        unimplemented!("query_broker_has_topic not implemented yet")
2027    }
2028
2029    async fn get_system_topic_list_from_broker(
2030        &self,
2031        _broker_addr: CheetahString,
2032    ) -> rocketmq_error::RocketMQResult<TopicList> {
2033        unimplemented!("get_system_topic_list_from_broker not implemented yet")
2034    }
2035
2036    async fn examine_topic_route_info_with_timeout(
2037        &self,
2038        _topic: CheetahString,
2039        _timeout_millis: u64,
2040    ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
2041        unimplemented!("examine_topic_route_info_with_timeout not implemented yet")
2042    }
2043
2044    async fn export_pop_records(
2045        &self,
2046        _broker_addr: CheetahString,
2047        _timeout: u64,
2048    ) -> rocketmq_error::RocketMQResult<()> {
2049        unimplemented!("export_pop_records not implemented yet")
2050    }
2051
2052    async fn switch_timer_engine(
2053        &self,
2054        _broker_addr: CheetahString,
2055        _des_timer_engine: CheetahString,
2056    ) -> rocketmq_error::RocketMQResult<()> {
2057        unimplemented!("switch_timer_engine not implemented yet")
2058    }
2059
2060    async fn trigger_lite_dispatch(
2061        &self,
2062        _broker_addr: CheetahString,
2063        _group: CheetahString,
2064        _client_id: CheetahString,
2065    ) -> rocketmq_error::RocketMQResult<()> {
2066        unimplemented!("trigger_lite_dispatch not implemented yet")
2067    }
2068    #[allow(deprecated)]
2069    async fn delete_topic_in_broker_concurrent(
2070        &self,
2071        _addrs: HashSet<CheetahString>,
2072        _topic: CheetahString,
2073    ) -> AdminToolResult<BrokerOperatorResult> {
2074        unimplemented!("delete_topic_in_broker_concurrent not implemented yet")
2075    }
2076
2077    async fn reset_offset_by_timestamp_old(
2078        &self,
2079        cluster_name: Option<CheetahString>,
2080        consumer_group: CheetahString,
2081        topic: CheetahString,
2082        timestamp: u64,
2083        force: bool,
2084    ) -> rocketmq_error::RocketMQResult<Vec<RollbackStats>> {
2085        let mut route_topic = topic.clone();
2086        if !topic.is_empty()
2087            && (mix_all::is_lmq(Some(topic.as_str()))
2088                || topic.as_str() == format!("{}wheel_timer", TopicValidator::SYSTEM_TOPIC_PREFIX))
2089            && cluster_name.as_ref().is_some_and(|name| !name.is_empty())
2090        {
2091            route_topic = cluster_name.unwrap();
2092        }
2093        let topic_route_data = self.examine_topic_route_info(route_topic).await?;
2094        let mut rollback_stats_list = Vec::new();
2095
2096        if let Some(route_data) = topic_route_data {
2097            let mut topic_route_map = HashMap::new();
2098            for queue_data in &route_data.queue_datas {
2099                topic_route_map.insert(queue_data.broker_name().to_string(), queue_data.clone());
2100            }
2101
2102            for broker_data in &route_data.broker_datas {
2103                if let Some(addr) = broker_data.select_broker_addr() {
2104                    if let Some(queue_data) = topic_route_map.get(broker_data.broker_name().as_str()) {
2105                        let mut rollback_stats = self
2106                            .reset_offset_by_timestamp_old_on_broker(
2107                                addr,
2108                                queue_data,
2109                                consumer_group.clone(),
2110                                topic.clone(),
2111                                timestamp as i64,
2112                                force,
2113                            )
2114                            .await?;
2115                        rollback_stats_list.append(&mut rollback_stats);
2116                    }
2117                }
2118            }
2119        }
2120
2121        Ok(rollback_stats_list)
2122    }
2123    #[allow(deprecated)]
2124    async fn reset_offset_new_concurrent(
2125        &self,
2126        _group: CheetahString,
2127        _topic: CheetahString,
2128        _timestamp: u64,
2129    ) -> AdminToolResult<BrokerOperatorResult> {
2130        unimplemented!("reset_offset_new_concurrent not implemented yet")
2131    }
2132
2133    async fn query_consume_time_span(
2134        &self,
2135        _topic: CheetahString,
2136        _group: CheetahString,
2137    ) -> rocketmq_error::RocketMQResult<Vec<QueueTimeSpan>> {
2138        unimplemented!("query_consume_time_span not implemented yet")
2139    }
2140
2141    async fn query_consume_time_span_concurrent(
2142        &self,
2143        _topic: CheetahString,
2144        _group: CheetahString,
2145    ) -> AdminToolResult<Vec<QueueTimeSpan>> {
2146        unimplemented!("query_consume_time_span_concurrent not implemented yet")
2147    }
2148    #[allow(deprecated)]
2149    async fn message_track_detail(&self, msg: MessageExt) -> rocketmq_error::RocketMQResult<Vec<MessageTrack>> {
2150        let group_list = self.query_topic_consume_by_who(msg.topic().clone()).await?;
2151        let mut result = Vec::with_capacity(group_list.get_group_list().len());
2152
2153        for group in group_list.get_group_list() {
2154            let mut track = build_message_track(group.as_str());
2155            let consumer_connection = match self.examine_consumer_connection_info(group.clone(), None).await {
2156                Ok(connection) => connection,
2157                Err(error) => {
2158                    apply_track_error(&mut track, &error);
2159                    result.push(track);
2160                    continue;
2161                }
2162            };
2163
2164            match consumer_connection.get_consume_type() {
2165                Some(ConsumeType::ConsumeActively) => {
2166                    track.set_track_type(TrackType::Pull);
2167                }
2168                Some(ConsumeType::ConsumePassively) => {
2169                    if consumer_connection.get_message_model() == Some(MessageModel::Broadcasting) {
2170                        track.set_track_type(TrackType::ConsumeBroadcasting);
2171                        result.push(track);
2172                        continue;
2173                    }
2174
2175                    let consumed = match self.message_consumed_by_group(&msg, group).await {
2176                        Ok(consumed) => consumed,
2177                        Err(error) => {
2178                            apply_track_error(&mut track, &error);
2179                            result.push(track);
2180                            continue;
2181                        }
2182                    };
2183
2184                    if consumed {
2185                        track.set_track_type(resolve_consumed_track_type(&msg, &consumer_connection));
2186                    } else {
2187                        track.set_track_type(TrackType::NotConsumedYet);
2188                    }
2189                }
2190                _ => {}
2191            }
2192
2193            result.push(track);
2194        }
2195
2196        result.sort_by(|left, right| left.consumer_group.cmp(&right.consumer_group));
2197        Ok(result)
2198    }
2199    #[allow(deprecated)]
2200    async fn message_track_detail_concurrent(&self, msg: MessageExt) -> AdminToolResult<Vec<MessageTrack>> {
2201        match self.message_track_detail(msg).await {
2202            Ok(data) => AdminToolResult::success(data),
2203            Err(error) => AdminToolResult::failure(admin_result_code_for_error(&error), error.to_string()),
2204        }
2205    }
2206
2207    async fn view_broker_stats_data(
2208        &self,
2209        broker_addr: CheetahString,
2210        stats_name: CheetahString,
2211        stats_key: CheetahString,
2212    ) -> rocketmq_error::RocketMQResult<BrokerStatsData> {
2213        let request_header = ViewBrokerStatsDataRequestHeader { stats_name, stats_key };
2214        self.client_instance
2215            .as_ref()
2216            .unwrap()
2217            .get_mq_client_api_impl()
2218            .view_broker_stats_data(&broker_addr, request_header, self.timeout_millis.as_millis() as u64)
2219            .await
2220    }
2221
2222    async fn fetch_consume_stats_in_broker(
2223        &self,
2224        _broker_addr: CheetahString,
2225        _is_order: bool,
2226        _timeout_millis: u64,
2227    ) -> rocketmq_error::RocketMQResult<ConsumeStatsList> {
2228        unimplemented!("fetch_consume_stats_in_broker not implemented yet")
2229    }
2230
2231    async fn get_all_subscription_group(
2232        &self,
2233        broker_addr: CheetahString,
2234        timeout_millis: u64,
2235    ) -> rocketmq_error::RocketMQResult<SubscriptionGroupWrapper> {
2236        self.client_instance
2237            .as_ref()
2238            .unwrap()
2239            .get_mq_client_api_impl()
2240            .get_all_subscription_group_config(&broker_addr, timeout_millis)
2241            .await
2242    }
2243
2244    async fn get_user_subscription_group(
2245        &self,
2246        broker_addr: CheetahString,
2247        timeout_millis: u64,
2248    ) -> rocketmq_error::RocketMQResult<SubscriptionGroupWrapper> {
2249        let subscription_group_wrapper = self.get_all_subscription_group(broker_addr, timeout_millis).await?;
2250
2251        let system_group_set = get_system_group_set();
2252        let table = subscription_group_wrapper.get_subscription_group_table();
2253        // Remove system consumer groups
2254        table.retain(|key, _| !mix_all::is_sys_consumer_group(key.as_str()) && !system_group_set.contains(key));
2255
2256        Ok(subscription_group_wrapper)
2257    }
2258
2259    async fn query_consume_queue(
2260        &self,
2261        broker_addr: CheetahString,
2262        topic: CheetahString,
2263        queue_id: i32,
2264        index: u64,
2265        count: i32,
2266        consumer_group: CheetahString,
2267    ) -> rocketmq_error::RocketMQResult<QueryConsumeQueueResponseBody> {
2268        self.client_instance
2269            .as_ref()
2270            .unwrap()
2271            .get_mq_client_api_impl()
2272            .query_consume_queue(
2273                &broker_addr,
2274                topic,
2275                queue_id,
2276                index as i64,
2277                count,
2278                consumer_group,
2279                self.timeout_millis.as_millis() as u64,
2280            )
2281            .await
2282    }
2283
2284    async fn update_and_get_group_read_forbidden(
2285        &self,
2286        _broker_addr: CheetahString,
2287        _group_name: CheetahString,
2288        _topic_name: CheetahString,
2289        _readable: Option<bool>,
2290    ) -> rocketmq_error::RocketMQResult<GroupForbidden> {
2291        unimplemented!("update_and_get_group_read_forbidden not implemented yet")
2292    }
2293
2294    async fn query_message(
2295        &self,
2296        _cluster_name: CheetahString,
2297        topic: CheetahString,
2298        msg_id: CheetahString,
2299    ) -> rocketmq_error::RocketMQResult<MessageExt> {
2300        let client_instance = self
2301            .client_instance
2302            .as_ref()
2303            .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
2304
2305        let msg_id_str = msg_id.as_str();
2306
2307        if let Err(e) = message_decoder::validate_message_id(msg_id_str) {
2308            return Err(rocketmq_error::RocketMQError::IllegalArgument(format!(
2309                "Invalid message ID: {}",
2310                e
2311            )));
2312        }
2313
2314        let message_id = message_decoder::decode_message_id(msg_id_str).map_err(|e| {
2315            rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
2316        })?;
2317        let broker_addr =
2318            CheetahString::from_string(format!("{}:{}", message_id.address.ip(), message_id.address.port()));
2319
2320        let request_header = ViewMessageRequestHeader {
2321            topic: Some(topic),
2322            offset: message_id.offset,
2323        };
2324
2325        client_instance
2326            .get_mq_client_api_impl()
2327            .view_message(&broker_addr, request_header, self.timeout_millis.as_millis() as u64)
2328            .await
2329    }
2330
2331    async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<HARuntimeInfo> {
2332        if let Some(ref mq_client_instance) = self.client_instance {
2333            Ok(mq_client_instance
2334                .get_mq_client_api_impl()
2335                .get_broker_ha_status(broker_addr, self.timeout_millis.as_millis() as u64)
2336                .await?)
2337        } else {
2338            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2339        }
2340    }
2341
2342    async fn get_in_sync_state_data(
2343        &self,
2344        controller_address: CheetahString,
2345        brokers: Vec<CheetahString>,
2346    ) -> rocketmq_error::RocketMQResult<BrokerReplicasInfo> {
2347        if let Some(ref mq_client_instance) = self.client_instance {
2348            Ok(mq_client_instance
2349                .get_mq_client_api_impl()
2350                .get_in_sync_state_data(controller_address, brokers, self.timeout_millis.as_millis() as u64)
2351                .await?)
2352        } else {
2353            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2354        }
2355    }
2356
2357    async fn get_broker_epoch_cache(
2358        &self,
2359        broker_addr: CheetahString,
2360    ) -> rocketmq_error::RocketMQResult<EpochEntryCache> {
2361        if let Some(ref mq_client_instance) = self.client_instance {
2362            Ok(mq_client_instance
2363                .get_mq_client_api_impl()
2364                .get_broker_epoch_cache(broker_addr, self.timeout_millis.as_millis() as u64)
2365                .await?)
2366        } else {
2367            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2368        }
2369    }
2370
2371    async fn elect_master(
2372        &self,
2373        _controller_addr: CheetahString,
2374        _cluster_name: CheetahString,
2375        _broker_name: CheetahString,
2376        _broker_id: Option<u64>,
2377    ) -> rocketmq_error::RocketMQResult<(ElectMasterResponseHeader, BrokerMemberGroup)> {
2378        unimplemented!("elect_master not implemented yet")
2379    }
2380
2381    async fn create_user_with_info(
2382        &self,
2383        _broker_addr: CheetahString,
2384        _username: CheetahString,
2385        _password: CheetahString,
2386    ) -> rocketmq_error::RocketMQResult<()> {
2387        unimplemented!("create_user_with_info not implemented yet")
2388    }
2389
2390    async fn update_user_with_info(
2391        &self,
2392        _broker_addr: CheetahString,
2393        _username: CheetahString,
2394        _password: CheetahString,
2395    ) -> rocketmq_error::RocketMQResult<()> {
2396        unimplemented!("update_user_with_info not implemented yet")
2397    }
2398
2399    async fn get_user(
2400        &self,
2401        broker_addr: CheetahString,
2402        username: CheetahString,
2403    ) -> rocketmq_error::RocketMQResult<Option<UserInfo>> {
2404        if let Some(ref mq_client_instance) = self.client_instance {
2405            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
2406            let timeout_millis = self.timeout_millis.as_millis() as u64;
2407            let result = mq_client_api.get_user(broker_addr, username, timeout_millis).await?;
2408            Ok(result)
2409        } else {
2410            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2411        }
2412    }
2413
2414    async fn list_users(
2415        &self,
2416        broker_addr: CheetahString,
2417        filter: CheetahString,
2418    ) -> rocketmq_error::RocketMQResult<Vec<UserInfo>> {
2419        if let Some(ref mq_client_instance) = self.client_instance {
2420            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
2421            let timeout_millis = self.timeout_millis.as_millis() as u64;
2422            let result = mq_client_api.list_users(broker_addr, filter, timeout_millis).await?;
2423            Ok(result)
2424        } else {
2425            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2426        }
2427    }
2428
2429    async fn create_acl_with_info(
2430        &self,
2431        _broker_addr: CheetahString,
2432        _subject: CheetahString,
2433    ) -> rocketmq_error::RocketMQResult<()> {
2434        unimplemented!("create_acl_with_info not implemented yet")
2435    }
2436
2437    async fn update_acl_with_info(
2438        &self,
2439        _broker_addr: CheetahString,
2440        _subject: CheetahString,
2441    ) -> rocketmq_error::RocketMQResult<()> {
2442        unimplemented!("update_acl_with_info not implemented yet")
2443    }
2444
2445    async fn get_acl(
2446        &self,
2447        _broker_addr: CheetahString,
2448        _subject: CheetahString,
2449    ) -> rocketmq_error::RocketMQResult<AclInfo> {
2450        unimplemented!("get_acl not implemented yet")
2451    }
2452
2453    async fn list_acl(
2454        &self,
2455        broker_addr: CheetahString,
2456        subject_filter: CheetahString,
2457        resource_filter: CheetahString,
2458    ) -> rocketmq_error::RocketMQResult<Vec<AclInfo>> {
2459        if let Some(ref mq_client_instance) = self.client_instance {
2460            let mq_client_api = mq_client_instance.get_mq_client_api_impl();
2461            let timeout_millis = self.timeout_millis.as_millis() as u64;
2462            let result = mq_client_api
2463                .list_acl(broker_addr, subject_filter, resource_filter, timeout_millis)
2464                .await?;
2465            Ok(result)
2466        } else {
2467            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2468        }
2469    }
2470
2471    async fn get_broker_lite_info(
2472        &self,
2473        broker_addr: CheetahString,
2474    ) -> rocketmq_error::RocketMQResult<GetBrokerLiteInfoResponseBody> {
2475        if let Some(ref mq_client_instance) = self.client_instance {
2476            mq_client_instance
2477                .get_mq_client_api_impl()
2478                .get_broker_lite_info(&broker_addr, self.timeout_millis.as_millis() as u64)
2479                .await
2480        } else {
2481            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2482        }
2483    }
2484
2485    async fn get_parent_topic_info(
2486        &self,
2487        broker_addr: CheetahString,
2488        topic: CheetahString,
2489    ) -> rocketmq_error::RocketMQResult<GetParentTopicInfoResponseBody> {
2490        self.client_instance
2491            .as_ref()
2492            .unwrap()
2493            .get_mq_client_api_impl()
2494            .get_parent_topic_info(&broker_addr, topic, self.timeout_millis.as_millis() as u64)
2495            .await
2496    }
2497
2498    async fn get_lite_topic_info(
2499        &self,
2500        broker_addr: CheetahString,
2501        parent_topic: CheetahString,
2502        lite_topic: CheetahString,
2503    ) -> rocketmq_error::RocketMQResult<GetLiteTopicInfoResponseBody> {
2504        if let Some(ref mq_client_instance) = self.client_instance {
2505            mq_client_instance
2506                .get_mq_client_api_impl()
2507                .get_lite_topic_info(
2508                    &broker_addr,
2509                    &parent_topic,
2510                    &lite_topic,
2511                    self.timeout_millis.as_millis() as u64,
2512                )
2513                .await
2514        } else {
2515            Err(rocketmq_error::RocketMQError::ClientNotStarted)
2516        }
2517    }
2518
2519    async fn get_lite_client_info(
2520        &self,
2521        _broker_addr: CheetahString,
2522        _parent_topic: CheetahString,
2523        _group: CheetahString,
2524        _client_id: CheetahString,
2525    ) -> rocketmq_error::RocketMQResult<GetLiteClientInfoResponseBody> {
2526        unimplemented!("get_lite_client_info not implemented yet")
2527    }
2528
2529    async fn get_lite_group_info(
2530        &self,
2531        broker_addr: CheetahString,
2532        group: CheetahString,
2533        lite_topic: CheetahString,
2534        top_k: i32,
2535    ) -> rocketmq_error::RocketMQResult<GetLiteGroupInfoResponseBody> {
2536        self.client_instance
2537            .as_ref()
2538            .unwrap()
2539            .get_mq_client_api_impl()
2540            .get_lite_group_info(
2541                &broker_addr,
2542                group,
2543                lite_topic,
2544                top_k,
2545                self.timeout_millis.as_millis() as u64,
2546            )
2547            .await
2548    }
2549
2550    async fn export_rocksdb_config_to_json(
2551        &self,
2552        _broker_addr: CheetahString,
2553        _config_types: Vec<CheetahString>,
2554    ) -> rocketmq_error::RocketMQResult<()> {
2555        unimplemented!("export_rocksdb_config_to_json not implemented yet")
2556    }
2557
2558    async fn search_offset(
2559        &self,
2560        broker_addr: CheetahString,
2561        topic_name: CheetahString,
2562        queue_id: i32,
2563        timestamp: u64,
2564        timeout_millis: u64,
2565    ) -> rocketmq_error::RocketMQResult<u64> {
2566        let mq = MessageQueue::from_parts(&topic_name, "", queue_id);
2567        let offset = self
2568            .client_instance
2569            .as_ref()
2570            .unwrap()
2571            .get_mq_client_api_impl()
2572            .search_offset_by_timestamp(
2573                broker_addr.as_str(),
2574                &mq,
2575                timestamp as i64,
2576                rocketmq_common::common::boundary_type::BoundaryType::Lower,
2577                timeout_millis,
2578            )
2579            .await?;
2580        Ok(offset as u64)
2581    }
2582
2583    async fn min_offset(
2584        &self,
2585        broker_addr: CheetahString,
2586        message_queue: MessageQueue,
2587        timeout_millis: u64,
2588    ) -> rocketmq_error::RocketMQResult<i64> {
2589        self.client_instance
2590            .as_ref()
2591            .unwrap()
2592            .get_mq_client_api_impl()
2593            .get_min_offset(broker_addr.as_str(), &message_queue, timeout_millis)
2594            .await
2595    }
2596
2597    async fn max_offset(
2598        &self,
2599        broker_addr: CheetahString,
2600        message_queue: MessageQueue,
2601        timeout_millis: u64,
2602    ) -> rocketmq_error::RocketMQResult<i64> {
2603        self.client_instance
2604            .as_ref()
2605            .unwrap()
2606            .get_mq_client_api_impl()
2607            .get_max_offset(broker_addr.as_str(), &message_queue, timeout_millis)
2608            .await
2609    }
2610}
2611
2612impl DefaultMQAdminExtImpl {
2613    async fn reset_offset_by_timestamp_old_on_broker(
2614        &self,
2615        broker_addr: CheetahString,
2616        queue_data: &QueueData,
2617        consumer_group: CheetahString,
2618        topic: CheetahString,
2619        timestamp: i64,
2620        force: bool,
2621    ) -> rocketmq_error::RocketMQResult<Vec<RollbackStats>> {
2622        let consume_stats = self
2623            .client_instance
2624            .as_ref()
2625            .unwrap()
2626            .get_mq_client_api_impl()
2627            .get_consume_stats(
2628                &broker_addr,
2629                GetConsumeStatsRequestHeader {
2630                    consumer_group: consumer_group.clone(),
2631                    topic: CheetahString::empty(),
2632                    topic_request_header: None,
2633                },
2634                self.timeout_millis.as_millis() as u64,
2635            )
2636            .await?;
2637
2638        let mut rollback_stats_list = Vec::new();
2639        let mut has_consumed = false;
2640
2641        for (queue, offset_wrapper) in &consume_stats.offset_table {
2642            if queue.topic() == &topic {
2643                has_consumed = true;
2644                rollback_stats_list.push(
2645                    self.reset_offset_consume_offset(
2646                        broker_addr.clone(),
2647                        consumer_group.clone(),
2648                        queue.clone(),
2649                        offset_wrapper,
2650                        timestamp,
2651                        force,
2652                    )
2653                    .await?,
2654                );
2655            }
2656        }
2657
2658        if !has_consumed {
2659            let topic_status = self
2660                .client_instance
2661                .as_ref()
2662                .unwrap()
2663                .get_mq_client_api_impl()
2664                .get_topic_stats_info(
2665                    &broker_addr,
2666                    GetTopicStatsInfoRequestHeader {
2667                        topic: topic.clone(),
2668                        topic_request_header: None,
2669                    },
2670                    self.timeout_millis.as_millis() as u64,
2671                )
2672                .await?;
2673
2674            for queue_id in 0..queue_data.read_queue_nums() {
2675                let queue = MessageQueue::from_parts(topic.clone(), queue_data.broker_name().clone(), queue_id as i32);
2676                let mut offset_wrapper = OffsetWrapper::new();
2677                let topic_offset = topic_status
2678                    .get_offset_table()
2679                    .get(&queue)
2680                    .cloned()
2681                    .unwrap_or_else(TopicOffset::new);
2682                offset_wrapper.set_broker_offset(topic_offset.get_max_offset());
2683                offset_wrapper.set_consumer_offset(topic_offset.get_min_offset());
2684                rollback_stats_list.push(
2685                    self.reset_offset_consume_offset(
2686                        broker_addr.clone(),
2687                        consumer_group.clone(),
2688                        queue,
2689                        &offset_wrapper,
2690                        timestamp,
2691                        force,
2692                    )
2693                    .await?,
2694                );
2695            }
2696        }
2697
2698        Ok(rollback_stats_list)
2699    }
2700
2701    async fn reset_offset_consume_offset(
2702        &self,
2703        broker_addr: CheetahString,
2704        consumer_group: CheetahString,
2705        queue: MessageQueue,
2706        offset_wrapper: &OffsetWrapper,
2707        timestamp: i64,
2708        force: bool,
2709    ) -> rocketmq_error::RocketMQResult<RollbackStats> {
2710        let reset_offset = if timestamp == -1 {
2711            self.client_instance
2712                .as_ref()
2713                .unwrap()
2714                .get_mq_client_api_impl()
2715                .get_max_offset(broker_addr.as_str(), &queue, self.timeout_millis.as_millis() as u64)
2716                .await?
2717        } else {
2718            self.client_instance
2719                .as_ref()
2720                .unwrap()
2721                .get_mq_client_api_impl()
2722                .search_offset_by_timestamp(
2723                    broker_addr.as_str(),
2724                    &queue,
2725                    timestamp,
2726                    rocketmq_common::common::boundary_type::BoundaryType::Lower,
2727                    self.timeout_millis.as_millis() as u64,
2728                )
2729                .await?
2730        };
2731
2732        let mut rollback_stats = RollbackStats {
2733            broker_name: queue.broker_name().clone(),
2734            queue_id: queue.queue_id() as i64,
2735            broker_offset: offset_wrapper.get_broker_offset(),
2736            consumer_offset: offset_wrapper.get_consumer_offset(),
2737            timestamp_offset: reset_offset,
2738            rollback_offset: offset_wrapper.get_consumer_offset(),
2739        };
2740
2741        if force || reset_offset <= offset_wrapper.get_consumer_offset() {
2742            rollback_stats.rollback_offset = reset_offset;
2743            self.client_instance
2744                .as_ref()
2745                .unwrap()
2746                .get_mq_client_api_impl()
2747                .update_consumer_offset(
2748                    &broker_addr,
2749                    UpdateConsumerOffsetRequestHeader {
2750                        consumer_group,
2751                        topic: queue.topic().clone(),
2752                        queue_id: queue.queue_id(),
2753                        commit_offset: reset_offset,
2754                        topic_request_header: None,
2755                    },
2756                    self.timeout_millis.as_millis() as u64,
2757                )
2758                .await?;
2759        }
2760
2761        Ok(rollback_stats)
2762    }
2763
2764    async fn message_consumed_by_group(
2765        &self,
2766        msg: &MessageExt,
2767        group: &CheetahString,
2768    ) -> rocketmq_error::RocketMQResult<bool> {
2769        let consume_stats = self
2770            .examine_consume_stats(group.clone(), None, None, None, None)
2771            .await?;
2772        let cluster_info = self.examine_broker_cluster_info().await?;
2773
2774        Ok(is_message_consumed(msg, &consume_stats, &cluster_info))
2775    }
2776}
2777
2778fn merge_order_conf_entries(existing: &str, value: &str) -> String {
2779    let mut entries = HashMap::new();
2780    for item in existing.split(';').filter(|item| !item.trim().is_empty()) {
2781        if let Some((broker_name, _)) = item.split_once(':') {
2782            entries.insert(broker_name.to_string(), item.to_string());
2783        }
2784    }
2785    if let Some((broker_name, _)) = value.split_once(':') {
2786        entries.insert(broker_name.to_string(), value.to_string());
2787    } else if !value.trim().is_empty() {
2788        entries.insert(value.to_string(), value.to_string());
2789    }
2790
2791    let mut broker_names: Vec<String> = entries.keys().cloned().collect();
2792    broker_names.sort();
2793    broker_names
2794        .into_iter()
2795        .filter_map(|broker_name| entries.remove(&broker_name))
2796        .collect::<Vec<_>>()
2797        .join(";")
2798}
2799
2800fn select_consumer_direct_connection(
2801    consumer_group: &CheetahString,
2802    consumer_connection: &ConsumerConnection,
2803    requested_client_id: Option<&CheetahString>,
2804) -> rocketmq_error::RocketMQResult<(CheetahString, CheetahString)> {
2805    let requested = requested_client_id.filter(|client_id| !client_id.is_empty());
2806    let connection = consumer_connection
2807        .get_connection_set()
2808        .iter()
2809        .find(|connection| {
2810            requested
2811                .map(|client_id| connection.get_client_id() == *client_id)
2812                .unwrap_or_else(|| !connection.get_client_id().is_empty())
2813        })
2814        .ok_or_else(|| {
2815            let message = requested
2816                .map(|client_id| {
2817                    format!(
2818                        "Client `{}` was not found in consumer group `{}`",
2819                        client_id, consumer_group
2820                    )
2821                })
2822                .unwrap_or_else(|| format!("NO CONSUMER for consumer group `{}`", consumer_group));
2823            rocketmq_error::RocketMQError::IllegalArgument(message)
2824        })?;
2825
2826    Ok((connection.get_client_id(), connection.get_client_addr()))
2827}
2828
2829#[allow(deprecated)]
2830fn build_message_track(consumer_group: &str) -> MessageTrack {
2831    MessageTrack {
2832        consumer_group: consumer_group.to_string(),
2833        track_type: Some(TrackType::Unknown),
2834        exception_desc: String::new(),
2835    }
2836}
2837
2838#[allow(deprecated)]
2839fn resolve_consumed_track_type(msg: &MessageExt, consumer_connection: &ConsumerConnection) -> TrackType {
2840    let Some(subscription_data) = consumer_connection.get_subscription_table().get(msg.topic()) else {
2841        return TrackType::Consumed;
2842    };
2843
2844    let Some(message_tag) = msg.get_tags() else {
2845        return TrackType::Consumed;
2846    };
2847
2848    if subscription_data.tags_set.is_empty()
2849        || subscription_data
2850            .tags_set
2851            .contains(&CheetahString::from_static_str(SubscriptionData::SUB_ALL))
2852        || subscription_data.tags_set.contains(&message_tag)
2853    {
2854        TrackType::Consumed
2855    } else {
2856        TrackType::ConsumedButFiltered
2857    }
2858}
2859
2860fn is_message_consumed(msg: &MessageExt, consume_stats: &ConsumeStats, cluster_info: &ClusterInfo) -> bool {
2861    consume_stats.get_offset_table().iter().any(|(queue, offset_wrapper)| {
2862        queue.topic() == msg.topic()
2863            && queue.queue_id() == msg.queue_id()
2864            && resolve_master_broker_addr(cluster_info, queue)
2865                .map(|broker_addr| {
2866                    broker_addr_matches_store_host(broker_addr, msg.store_host())
2867                        && offset_wrapper.get_consumer_offset() > msg.queue_offset()
2868                })
2869                .unwrap_or(false)
2870    })
2871}
2872
2873fn resolve_master_broker_addr<'a>(cluster_info: &'a ClusterInfo, queue: &MessageQueue) -> Option<&'a CheetahString> {
2874    cluster_info
2875        .broker_addr_table
2876        .as_ref()?
2877        .get(queue.broker_name())?
2878        .broker_addrs()
2879        .get(&mix_all::MASTER_ID)
2880}
2881
2882fn broker_addr_matches_store_host(broker_addr: &CheetahString, store_host: std::net::SocketAddr) -> bool {
2883    broker_addr
2884        .parse::<std::net::SocketAddr>()
2885        .map(|parsed| parsed == store_host)
2886        .unwrap_or_else(|_| broker_addr.as_str() == store_host.to_string())
2887}
2888
2889#[allow(deprecated)]
2890fn apply_track_error(track: &mut MessageTrack, error: &RocketMQError) {
2891    if let Some(code) = response_code_from_error(error) {
2892        match code {
2893            ResponseCode::ConsumerNotOnline => track.set_track_type(TrackType::NotOnline),
2894            ResponseCode::BroadcastConsumption => track.set_track_type(TrackType::ConsumeBroadcasting),
2895            _ => {}
2896        }
2897    }
2898
2899    track.set_exception_desc(track_exception_desc(error));
2900}
2901
2902fn response_code_from_error(error: &RocketMQError) -> Option<ResponseCode> {
2903    match error {
2904        RocketMQError::BrokerOperationFailed { code, .. } => Some(ResponseCode::from(*code)),
2905        RocketMQError::IllegalArgument(message) => parse_response_code_from_message(message),
2906        _ => None,
2907    }
2908}
2909
2910fn parse_response_code_from_message(message: &str) -> Option<ResponseCode> {
2911    let code_start = message.find("CODE:")?;
2912    let digits = message[code_start + "CODE:".len()..]
2913        .trim_start()
2914        .chars()
2915        .take_while(|ch| ch.is_ascii_digit() || *ch == '-')
2916        .collect::<String>();
2917
2918    if digits.is_empty() {
2919        return None;
2920    }
2921
2922    digits.parse::<i32>().ok().map(ResponseCode::from)
2923}
2924
2925fn track_exception_desc(error: &RocketMQError) -> String {
2926    match error {
2927        RocketMQError::BrokerOperationFailed { code, message, .. } => format!("CODE:{code} DESC:{message}"),
2928        _ => error.to_string(),
2929    }
2930}
2931
2932fn admin_result_code_for_error(error: &RocketMQError) -> AdminToolsResultCodeEnum {
2933    match response_code_from_error(error) {
2934        Some(ResponseCode::ConsumerNotOnline) => AdminToolsResultCodeEnum::ConsumerNotOnline,
2935        Some(ResponseCode::BroadcastConsumption) => AdminToolsResultCodeEnum::BroadcastConsumption,
2936        Some(_) => AdminToolsResultCodeEnum::MQBrokerError,
2937        None => AdminToolsResultCodeEnum::MQClientError,
2938    }
2939}
2940
2941#[cfg(test)]
2942mod tests {
2943    use std::collections::BTreeSet;
2944    use std::collections::HashMap;
2945
2946    use cheetah_string::CheetahString;
2947    use rocketmq_common::common::message::message_builder::MessageBuilder;
2948    use rocketmq_common::common::message::message_ext::MessageExt;
2949    use rocketmq_common::common::message::message_queue::MessageQueue;
2950    use rocketmq_common::common::mix_all;
2951    #[allow(deprecated)]
2952    use rocketmq_common::common::tools::track_type::TrackType;
2953    use rocketmq_remoting::code::response_code::ResponseCode;
2954    use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
2955    use rocketmq_remoting::protocol::admin::offset_wrapper::OffsetWrapper;
2956    use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
2957    use rocketmq_remoting::protocol::body::connection::Connection;
2958    use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
2959    use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
2960    use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
2961    use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
2962    use rocketmq_remoting::protocol::route::route_data_view::BrokerData;
2963
2964    use super::encode_topic_attributes;
2965    use super::is_message_consumed;
2966    use super::merge_order_conf_entries;
2967    use super::parse_response_code_from_message;
2968    use super::resolve_consumed_track_type;
2969    use super::select_consumer_direct_connection;
2970
2971    #[test]
2972    fn merge_order_conf_entries_replaces_existing_broker_value() {
2973        let merged = merge_order_conf_entries("broker-a:4;broker-b:4", "broker-a:8");
2974        assert_eq!(merged, "broker-a:8;broker-b:4");
2975    }
2976
2977    #[test]
2978    fn merge_order_conf_entries_adds_new_broker_value() {
2979        let merged = merge_order_conf_entries("broker-a:4", "broker-b:8");
2980        assert_eq!(merged, "broker-a:4;broker-b:8");
2981    }
2982
2983    #[test]
2984    fn encode_topic_attributes_matches_java_attribute_parser_format() {
2985        let mut attributes = HashMap::<CheetahString, CheetahString>::new();
2986        attributes.insert("+message.type".into(), "NORMAL".into());
2987
2988        let encoded = encode_topic_attributes(&attributes);
2989
2990        assert_eq!(encoded, Some(CheetahString::from("+message.type=NORMAL")));
2991    }
2992
2993    #[test]
2994    fn producer_connection_empty_set_represents_offline_group() {
2995        let connection = ProducerConnection::new();
2996        assert!(connection.connection_set().is_empty());
2997    }
2998
2999    #[test]
3000    fn producer_connection_with_entries_represents_online_group() {
3001        let mut connection = ProducerConnection::new();
3002        let mut entry = Connection::new();
3003        entry.set_client_id("client-a".into());
3004        connection.connection_set_mut().insert(entry);
3005
3006        assert_eq!(connection.connection_set().len(), 1);
3007    }
3008
3009    #[test]
3010    fn select_consumer_direct_connection_uses_requested_client_when_present() {
3011        let consumer_group = CheetahString::from("group-a");
3012        let requested_client_id = CheetahString::from("client-b");
3013        let mut consumer_connection = ConsumerConnection::new();
3014        let mut first = Connection::new();
3015        first.set_client_id("client-a".into());
3016        first.set_client_addr("127.0.0.1:1001".into());
3017        let mut second = Connection::new();
3018        second.set_client_id(requested_client_id.clone());
3019        second.set_client_addr("127.0.0.1:1002".into());
3020        consumer_connection.insert_connection(first);
3021        consumer_connection.insert_connection(second);
3022
3023        let (client_id, client_addr) =
3024            select_consumer_direct_connection(&consumer_group, &consumer_connection, Some(&requested_client_id))
3025                .expect("requested client should be selected");
3026
3027        assert_eq!(client_id, requested_client_id);
3028        assert_eq!(client_addr, CheetahString::from("127.0.0.1:1002"));
3029    }
3030
3031    #[test]
3032    fn select_consumer_direct_connection_returns_first_available_client_when_unspecified() {
3033        let consumer_group = CheetahString::from("group-a");
3034        let mut consumer_connection = ConsumerConnection::new();
3035        let mut only = Connection::new();
3036        only.set_client_id("client-a".into());
3037        only.set_client_addr("127.0.0.1:1001".into());
3038        consumer_connection.insert_connection(only);
3039
3040        let (client_id, client_addr) =
3041            select_consumer_direct_connection(&consumer_group, &consumer_connection, Some(&CheetahString::default()))
3042                .expect("single consumer should be selected");
3043
3044        assert_eq!(client_id, CheetahString::from("client-a"));
3045        assert_eq!(client_addr, CheetahString::from("127.0.0.1:1001"));
3046    }
3047
3048    #[test]
3049    fn select_consumer_direct_connection_errors_when_group_is_offline() {
3050        let consumer_group = CheetahString::from("group-a");
3051        let consumer_connection = ConsumerConnection::new();
3052
3053        let error = select_consumer_direct_connection(&consumer_group, &consumer_connection, None)
3054            .expect_err("offline group should not resolve a client");
3055
3056        assert!(error.to_string().contains("NO CONSUMER"));
3057    }
3058
3059    #[test]
3060    #[allow(deprecated)]
3061    fn resolve_consumed_track_type_marks_filtered_subscription() {
3062        let message = MessageBuilder::new()
3063            .topic("TopicTest")
3064            .body_slice(b"payload")
3065            .tags("TagA")
3066            .build_unchecked();
3067        let mut message_ext = MessageExt::default();
3068        message_ext.set_message_inner(message);
3069
3070        let mut subscription = SubscriptionData {
3071            topic: CheetahString::from("TopicTest"),
3072            ..Default::default()
3073        };
3074        subscription.tags_set = BTreeSet::from([CheetahString::from("TagB")]);
3075
3076        let mut connection = ConsumerConnection::new();
3077        connection.set_consume_type(ConsumeType::ConsumePassively);
3078        connection
3079            .get_subscription_table_mut()
3080            .insert(CheetahString::from("TopicTest"), subscription);
3081
3082        let track_type = resolve_consumed_track_type(&message_ext, &connection);
3083
3084        assert_eq!(track_type, TrackType::ConsumedButFiltered);
3085    }
3086
3087    #[test]
3088    fn is_message_consumed_returns_true_when_offset_has_advanced_on_master() {
3089        let message = MessageBuilder::new()
3090            .topic("TopicTest")
3091            .body_slice(b"payload")
3092            .build_unchecked();
3093        let mut message_ext = MessageExt::default();
3094        message_ext.set_message_inner(message);
3095        message_ext.set_queue_id(1);
3096        message_ext.set_queue_offset(10);
3097        message_ext.set_store_host("127.0.0.1:10911".parse().expect("store host"));
3098
3099        let mut consume_stats = ConsumeStats::new();
3100        let mut offset_wrapper = OffsetWrapper::default();
3101        offset_wrapper.set_consumer_offset(11);
3102        consume_stats
3103            .get_offset_table_mut()
3104            .insert(MessageQueue::from_parts("TopicTest", "broker-a", 1), offset_wrapper);
3105
3106        let mut broker_addrs = HashMap::new();
3107        broker_addrs.insert(mix_all::MASTER_ID, CheetahString::from("127.0.0.1:10911"));
3108        let broker_data = BrokerData::new(
3109            CheetahString::from("cluster-a"),
3110            CheetahString::from("broker-a"),
3111            broker_addrs,
3112            None,
3113        );
3114        let cluster_info = ClusterInfo::new(
3115            Some(HashMap::from([(CheetahString::from("broker-a"), broker_data)])),
3116            None,
3117        );
3118
3119        assert!(is_message_consumed(&message_ext, &consume_stats, &cluster_info));
3120    }
3121
3122    #[test]
3123    fn parse_response_code_from_message_reads_consumer_not_online_code() {
3124        let code = parse_response_code_from_message("CODE: 206 DESC: Not found the consumer group connection");
3125
3126        assert_eq!(code, Some(ResponseCode::ConsumerNotOnline));
3127    }
3128}