1#![allow(dead_code)]
16use std::collections::HashMap;
17use std::collections::HashSet;
18use std::env;
19use std::sync::Arc;
20use std::sync::OnceLock;
21use std::time::Duration;
22
23use crate::admin::mq_admin_ext_async::MQAdminExt;
24use crate::admin::mq_admin_ext_async_inner::MQAdminExtInnerImpl;
25use crate::base::client_config::ClientConfig;
26use crate::base::validators::Validators;
27use crate::common::admin_tool_result::AdminToolResult;
28use crate::common::admin_tools_result_code_enum::AdminToolsResultCodeEnum;
29use crate::consumer::consumer_impl::pull_request_ext::PullResultExt;
30use crate::consumer::pull_callback::PullCallback;
31use crate::consumer::pull_status::PullStatus;
32use crate::factory::mq_client_instance::MQClientInstance;
33use crate::implementation::communication_mode::CommunicationMode;
34use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
35use crate::implementation::mq_client_manager::MQClientManager;
36use cheetah_string::CheetahString;
37use rand::seq::IndexedRandom;
38use rocketmq_common::common::attribute::attribute_parser::AttributeParser;
39use rocketmq_common::common::base::plain_access_config::PlainAccessConfig;
40use rocketmq_common::common::base::service_state::ServiceState;
41use rocketmq_common::common::config::TopicConfig;
42use rocketmq_common::common::constant::PermName;
43use rocketmq_common::common::message::message_decoder;
44use rocketmq_common::common::message::message_enum::MessageRequestMode;
45use rocketmq_common::common::message::message_ext::MessageExt;
46use rocketmq_common::common::message::message_queue::MessageQueue;
47use rocketmq_common::common::mix_all;
48use rocketmq_common::common::mix_all::DLQ_GROUP_TOPIC_PREFIX;
49use rocketmq_common::common::mix_all::RETRY_GROUP_TOPIC_PREFIX;
50use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
51#[allow(deprecated)]
52use rocketmq_common::common::tools::broker_operator_result::BrokerOperatorResult;
53#[allow(deprecated)]
54use rocketmq_common::common::tools::message_track::MessageTrack;
55#[allow(deprecated)]
56use rocketmq_common::common::tools::track_type::TrackType;
57use rocketmq_common::common::topic::TopicValidator;
58use rocketmq_common::common::FAQUrl;
59use rocketmq_error::RocketMQError;
60use rocketmq_remoting::code::response_code::ResponseCode;
61use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
62use rocketmq_remoting::protocol::admin::consume_stats_list::ConsumeStatsList;
63use rocketmq_remoting::protocol::admin::offset_wrapper::OffsetWrapper;
64use rocketmq_remoting::protocol::admin::rollback_stats::RollbackStats;
65use rocketmq_remoting::protocol::admin::topic_offset::TopicOffset;
66use rocketmq_remoting::protocol::admin::topic_stats_table::TopicStatsTable;
67use rocketmq_remoting::protocol::body::acl_info::AclInfo;
68use rocketmq_remoting::protocol::body::broker_body::broker_member_group::BrokerMemberGroup;
69use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
70use rocketmq_remoting::protocol::body::broker_replicas_info::BrokerReplicasInfo;
71use rocketmq_remoting::protocol::body::check_rocksdb_cqwrite_progress_response_body::CheckRocksdbCqWriteResult;
72use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
73use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
74use rocketmq_remoting::protocol::body::consumer_running_info::ConsumerRunningInfo;
75use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntryCache;
76use rocketmq_remoting::protocol::body::get_broker_lite_info_response_body::GetBrokerLiteInfoResponseBody;
77use rocketmq_remoting::protocol::body::get_lite_client_info_response_body::GetLiteClientInfoResponseBody;
78use rocketmq_remoting::protocol::body::get_lite_group_info_response_body::GetLiteGroupInfoResponseBody;
79use rocketmq_remoting::protocol::body::get_lite_topic_info_response_body::GetLiteTopicInfoResponseBody;
80use rocketmq_remoting::protocol::body::get_parent_topic_info_response_body::GetParentTopicInfoResponseBody;
81use rocketmq_remoting::protocol::body::group_list::GroupList;
82use rocketmq_remoting::protocol::body::ha_runtime_info::HARuntimeInfo;
83use rocketmq_remoting::protocol::body::kv_table::KVTable;
84use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
85use rocketmq_remoting::protocol::body::producer_table_info::ProducerTableInfo;
86use rocketmq_remoting::protocol::body::query_consume_queue_response_body::QueryConsumeQueueResponseBody;
87use rocketmq_remoting::protocol::body::queue_time_span::QueueTimeSpan;
88use rocketmq_remoting::protocol::body::subscription_group_wrapper::SubscriptionGroupWrapper;
89use rocketmq_remoting::protocol::body::topic::topic_list::TopicList;
90use rocketmq_remoting::protocol::body::topic_info_wrapper::TopicConfigSerializeWrapper;
91use rocketmq_remoting::protocol::body::user_info::UserInfo;
92use rocketmq_remoting::protocol::header::consume_message_directly_result_request_header::ConsumeMessageDirectlyResultRequestHeader;
93use rocketmq_remoting::protocol::header::create_topic_request_header::CreateTopicRequestHeader;
94use rocketmq_remoting::protocol::header::delete_topic_request_header::DeleteTopicRequestHeader;
95use rocketmq_remoting::protocol::header::elect_master_response_header::ElectMasterResponseHeader;
96use rocketmq_remoting::protocol::header::get_consume_stats_request_header::GetConsumeStatsRequestHeader;
97use rocketmq_remoting::protocol::header::get_meta_data_response_header::GetMetaDataResponseHeader;
98use rocketmq_remoting::protocol::header::get_topic_config_request_header::GetTopicConfigRequestHeader;
99use rocketmq_remoting::protocol::header::get_topic_stats_info_request_header::GetTopicStatsInfoRequestHeader;
100use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::DeleteTopicFromNamesrvRequestHeader;
101use rocketmq_remoting::protocol::header::pull_message_request_header::PullMessageRequestHeader;
102use rocketmq_remoting::protocol::header::query_topic_consume_by_who_request_header::QueryTopicConsumeByWhoRequestHeader;
103use rocketmq_remoting::protocol::header::reset_offset_request_header::ResetOffsetRequestHeader;
104use rocketmq_remoting::protocol::header::update_consumer_offset_header::UpdateConsumerOffsetRequestHeader;
105use rocketmq_remoting::protocol::header::view_broker_stats_data_request_header::ViewBrokerStatsDataRequestHeader;
106use rocketmq_remoting::protocol::header::view_message_request_header::ViewMessageRequestHeader;
107use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
108use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
109use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
110use rocketmq_remoting::protocol::route::route_data_view::QueueData;
111use rocketmq_remoting::protocol::route::topic_route_data::TopicRouteData;
112use rocketmq_remoting::protocol::static_topic::topic_config_and_queue_mapping::TopicConfigAndQueueMapping;
113use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
114use rocketmq_remoting::protocol::subscription::broker_stats_data::BrokerStatsData;
115use rocketmq_remoting::protocol::subscription::group_forbidden::GroupForbidden;
116use rocketmq_remoting::protocol::subscription::subscription_group_config::SubscriptionGroupConfig;
117use rocketmq_remoting::runtime::RPCHook;
118use rocketmq_rust::ArcMut;
119use tracing::info;
120
121static SYSTEM_GROUP_SET: OnceLock<HashSet<CheetahString>> = OnceLock::new();
122
123fn get_system_group_set() -> &'static HashSet<CheetahString> {
124 SYSTEM_GROUP_SET.get_or_init(|| {
125 let mut set = HashSet::new();
126 set.insert(CheetahString::from(mix_all::DEFAULT_CONSUMER_GROUP));
127 set.insert(CheetahString::from(mix_all::DEFAULT_PRODUCER_GROUP));
128 set.insert(CheetahString::from(mix_all::TOOLS_CONSUMER_GROUP));
129 set.insert(CheetahString::from(mix_all::SCHEDULE_CONSUMER_GROUP));
130 set.insert(CheetahString::from(mix_all::FILTERSRV_CONSUMER_GROUP));
131 set.insert(CheetahString::from(mix_all::MONITOR_CONSUMER_GROUP));
132 set.insert(CheetahString::from(mix_all::CLIENT_INNER_PRODUCER_GROUP));
133 set.insert(CheetahString::from(mix_all::SELF_TEST_PRODUCER_GROUP));
134 set.insert(CheetahString::from(mix_all::SELF_TEST_CONSUMER_GROUP));
135 set.insert(CheetahString::from(mix_all::ONS_HTTP_PROXY_GROUP));
136 set.insert(CheetahString::from(mix_all::CID_ONSAPI_PERMISSION_GROUP));
137 set.insert(CheetahString::from(mix_all::CID_ONSAPI_OWNER_GROUP));
138 set.insert(CheetahString::from(mix_all::CID_ONSAPI_PULL_GROUP));
139 set.insert(CheetahString::from(mix_all::CID_SYS_RMQ_TRANS));
140 set
141 })
142}
143
144const SOCKS_PROXY_JSON: &str = "socksProxyJson";
145const NAMESPACE_ORDER_TOPIC_CONFIG: &str = "ORDER_TOPIC_CONFIG";
146
147fn encode_topic_attributes(attributes: &HashMap<CheetahString, CheetahString>) -> Option<CheetahString> {
148 if attributes.is_empty() {
149 return None;
150 }
151
152 let serialized = AttributeParser::parse_to_string(
153 &attributes
154 .iter()
155 .map(|(key, value)| (key.to_string(), value.to_string()))
156 .collect::<HashMap<String, String>>(),
157 );
158
159 if serialized.is_empty() {
160 None
161 } else {
162 Some(serialized.into())
163 }
164}
165
166pub struct DefaultMQAdminExtImpl {
167 service_state: ServiceState,
168 client_instance: Option<ArcMut<MQClientInstance>>,
169 rpc_hook: Option<Arc<dyn RPCHook>>,
170 timeout_millis: Duration,
171 kv_namespace_to_delete_list: Vec<CheetahString>,
172 client_config: ArcMut<ClientConfig>,
173 admin_ext_group: CheetahString,
174 inner: Option<ArcMut<DefaultMQAdminExtImpl>>,
175}
176
177impl DefaultMQAdminExtImpl {
178 pub fn new(
179 rpc_hook: Option<Arc<dyn RPCHook>>,
180 timeout_millis: Duration,
181 client_config: ArcMut<ClientConfig>,
182 admin_ext_group: CheetahString,
183 ) -> Self {
184 DefaultMQAdminExtImpl {
185 service_state: ServiceState::CreateJust,
186 client_instance: None,
187 rpc_hook,
188 timeout_millis,
189 kv_namespace_to_delete_list: vec![CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG)],
190 client_config,
191 admin_ext_group,
192 inner: None,
193 }
194 }
195
196 pub fn set_inner(&mut self, inner: ArcMut<DefaultMQAdminExtImpl>) {
197 self.inner = Some(inner);
198 }
199
200 pub fn has_inner(&self) -> bool {
201 self.inner.is_some()
202 }
203
204 pub async fn create_acl_with_acl_info(
205 &self,
206 broker_addr: CheetahString,
207 acl_info: AclInfo,
208 ) -> rocketmq_error::RocketMQResult<()> {
209 let subject = acl_info
210 .subject
211 .clone()
212 .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("ACL subject is required".into()))?;
213
214 if let Some(ref policies) = acl_info.policies {
215 for policy in policies {
216 if let Some(ref entries) = policy.entries {
217 for entry in entries {
218 let resources: Vec<CheetahString> =
219 entry.resource.as_ref().map(|r| vec![r.clone()]).unwrap_or_default();
220
221 let actions: Vec<CheetahString> = entry
222 .actions
223 .as_ref()
224 .map(|a| a.split(',').map(|s| CheetahString::from(s.trim())).collect())
225 .unwrap_or_default();
226
227 let source_ips: Vec<CheetahString> = entry.source_ips.clone().unwrap_or_default();
228
229 let decision: CheetahString = entry.decision.clone().unwrap_or_default();
230
231 self.create_acl(
232 broker_addr.clone(),
233 subject.clone(),
234 resources,
235 actions,
236 source_ips,
237 decision,
238 )
239 .await?;
240 }
241 }
242 }
243 }
244
245 Ok(())
246 }
247
248 pub async fn update_acl_with_acl_info(
249 &self,
250 broker_addr: CheetahString,
251 acl_info: AclInfo,
252 ) -> rocketmq_error::RocketMQResult<()> {
253 let subject = acl_info
254 .subject
255 .clone()
256 .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("ACL subject is required".into()))?;
257
258 if let Some(ref policies) = acl_info.policies {
259 for policy in policies {
260 if let Some(ref entries) = policy.entries {
261 for entry in entries {
262 let resources: Vec<CheetahString> =
263 entry.resource.as_ref().map(|r| vec![r.clone()]).unwrap_or_default();
264
265 let actions: Vec<CheetahString> = entry
266 .actions
267 .as_ref()
268 .map(|a| a.split(',').map(|s| CheetahString::from(s.trim())).collect())
269 .unwrap_or_default();
270
271 let source_ips: Vec<CheetahString> = entry.source_ips.clone().unwrap_or_default();
272
273 let decision: CheetahString = entry.decision.clone().unwrap_or_default();
274
275 self.update_acl(
276 broker_addr.clone(),
277 subject.clone(),
278 resources,
279 actions,
280 source_ips,
281 decision,
282 )
283 .await?;
284 }
285 }
286 }
287 }
288
289 Ok(())
290 }
291
292 pub async fn create_user_with_user_info(
293 &self,
294 broker_addr: CheetahString,
295 user_info: UserInfo,
296 ) -> rocketmq_error::RocketMQResult<()> {
297 let username = user_info
298 .username
299 .clone()
300 .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("User username is required".into()))?;
301
302 let password = user_info.password.clone().unwrap_or_default();
303 let user_type = user_info.user_type.clone().unwrap_or_default();
304
305 self.create_user(broker_addr, username, password, user_type).await
306 }
307
308 pub async fn update_user_with_user_info(
309 &self,
310 broker_addr: CheetahString,
311 user_info: UserInfo,
312 ) -> rocketmq_error::RocketMQResult<()> {
313 let username = user_info
314 .username
315 .clone()
316 .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("User username is required".into()))?;
317
318 let password = user_info.password.clone().unwrap_or_default();
319 let user_type = user_info.user_type.clone().unwrap_or_default();
320 let user_status = user_info.user_status.clone().unwrap_or_default();
321
322 self.update_user(broker_addr, username, password, user_type, user_status)
323 .await
324 }
325
326 pub async fn pull_message_from_queue(
327 &self,
328 broker_addr: &str,
329 mq: &MessageQueue,
330 sub_expression: &str,
331 offset: i64,
332 max_nums: i32,
333 timeout_millis: u64,
334 ) -> rocketmq_error::RocketMQResult<crate::consumer::pull_result::PullResult> {
335 let sys_flag = PullSysFlag::build_sys_flag(false, false, true, false);
336
337 let request_header = PullMessageRequestHeader {
338 consumer_group: CheetahString::from_static_str(mix_all::TOOLS_CONSUMER_GROUP),
339 topic: mq.topic().clone(),
340 queue_id: mq.queue_id(),
341 queue_offset: offset,
342 max_msg_nums: max_nums,
343 sys_flag: sys_flag as i32,
344 commit_offset: 0,
345 suspend_timeout_millis: 0,
346 sub_version: 0,
347 subscription: Some(CheetahString::from(sub_expression)),
348 expression_type: None,
349 max_msg_bytes: None,
350 request_source: None,
351 proxy_forward_client_id: None,
352 topic_request: None,
353 };
354
355 struct NoopPullCallback;
356 impl PullCallback for NoopPullCallback {
357 async fn on_success(&mut self, _pull_result: PullResultExt) {}
358 fn on_exception(&mut self, _e: Box<dyn std::error::Error + Send>) {}
359 }
360
361 let api_impl = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
362
363 let mut result = MQClientAPIImpl::pull_message(
364 api_impl,
365 CheetahString::from(broker_addr),
366 request_header,
367 timeout_millis,
368 CommunicationMode::Sync,
369 NoopPullCallback,
370 )
371 .await?
372 .ok_or_else(|| rocketmq_error::RocketMQError::Internal("pull_message returned None in sync mode".into()))?;
373
374 if result.pull_result.pull_status == PullStatus::Found {
375 if let Some(mut message_binary) = result.message_binary.take() {
376 let msg_vec = message_decoder::decodes_batch(&mut message_binary, true, true);
377 result.pull_result.msg_found_list = Some(msg_vec.into_iter().map(ArcMut::new).collect());
378 }
379 }
380
381 Ok(result.pull_result)
382 }
383
384 pub async fn query_message_by_key(
385 &self,
386 cluster_name: Option<CheetahString>,
387 topic: CheetahString,
388 key: CheetahString,
389 max_num: i32,
390 begin_timestamp: i64,
391 end_timestamp: i64,
392 _key_type: CheetahString,
393 _last_key: Option<CheetahString>,
394 ) -> rocketmq_error::RocketMQResult<crate::base::query_result::QueryResult> {
395 self.query_message_by_key_internal(cluster_name, topic, key, max_num, begin_timestamp, end_timestamp, false)
396 .await
397 }
398
399 pub async fn query_message_by_unique_key(
400 &self,
401 cluster_name: Option<CheetahString>,
402 topic: CheetahString,
403 unique_key: CheetahString,
404 max_num: i32,
405 begin_timestamp: i64,
406 end_timestamp: i64,
407 ) -> rocketmq_error::RocketMQResult<crate::base::query_result::QueryResult> {
408 self.query_message_by_key_internal(
409 cluster_name,
410 topic,
411 unique_key,
412 max_num,
413 begin_timestamp,
414 end_timestamp,
415 true,
416 )
417 .await
418 }
419
420 async fn query_message_by_key_internal(
421 &self,
422 cluster_name: Option<CheetahString>,
423 topic: CheetahString,
424 key: CheetahString,
425 max_num: i32,
426 begin_timestamp: i64,
427 end_timestamp: i64,
428 unique_key_flag: bool,
429 ) -> rocketmq_error::RocketMQResult<crate::base::query_result::QueryResult> {
430 let route_topic = cluster_name.unwrap_or_else(|| topic.clone());
431 let topic_route_data = self
432 .examine_topic_route_info(route_topic.clone())
433 .await?
434 .ok_or_else(|| {
435 rocketmq_error::RocketMQError::Internal(format!("Topic route not found for: {}", route_topic))
436 })?;
437
438 let mut message_list: Vec<MessageExt> = Vec::new();
439 let mut index_last_update_timestamp: u64 = 0;
440
441 let api_impl = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
442 let timeout = self.timeout_millis.as_millis() as u64;
443
444 for broker_data in &topic_route_data.broker_datas {
445 let broker_addr = match broker_data.select_broker_addr() {
446 Some(addr) => addr,
447 None => continue,
448 };
449
450 let request_header =
451 rocketmq_remoting::protocol::header::query_message_request_header::QueryMessageRequestHeader {
452 topic: topic.clone(),
453 key: key.clone(),
454 max_num,
455 begin_timestamp,
456 end_timestamp,
457 topic_request_header: None,
458 };
459
460 match MQClientAPIImpl::query_message(&api_impl, &broker_addr, request_header, unique_key_flag, timeout)
461 .await
462 {
463 Ok(Some((response_header, body))) => {
464 if let Some(mut body_bytes) = body {
465 let msgs = message_decoder::decodes_batch(&mut body_bytes, true, true);
466 message_list.extend(msgs);
467 }
468 if response_header.index_last_update_timestamp as u64 > index_last_update_timestamp {
469 index_last_update_timestamp = response_header.index_last_update_timestamp as u64;
470 }
471 }
472 Ok(None) => {
473 }
475 Err(e) => {
476 tracing::warn!("Failed to query message by key from broker {}: {}", broker_addr, e);
477 }
478 }
479 }
480
481 Ok(crate::base::query_result::QueryResult::new(
482 index_last_update_timestamp,
483 message_list,
484 ))
485 }
486}
487
488#[allow(unused_variables)]
489#[allow(unused_mut)]
490impl MQAdminExt for DefaultMQAdminExtImpl {
491 async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
492 match self.service_state {
493 ServiceState::CreateJust => {
494 self.service_state = ServiceState::StartFailed;
495 self.client_config.change_instance_name_to_pid();
496 if "{}".eq(&self.client_config.socks_proxy_config) {
497 self.client_config.socks_proxy_config =
498 env::var(SOCKS_PROXY_JSON).unwrap_or_else(|_| "{}".to_string()).into();
499 }
500 self.client_instance = Some(
501 MQClientManager::get_instance()
502 .get_or_create_mq_client_instance(self.client_config.as_ref().clone(), self.rpc_hook.clone()),
503 );
504
505 let group = &self.admin_ext_group.clone();
506 let register_ok = self
507 .client_instance
508 .as_mut()
509 .unwrap()
510 .register_admin_ext(
511 group,
512 MQAdminExtInnerImpl {
513 inner: self.inner.as_ref().unwrap().clone(),
514 },
515 )
516 .await;
517 if !register_ok {
518 self.service_state = ServiceState::StartFailed;
519 return Err(rocketmq_error::RocketMQError::illegal_argument(format!(
520 "The adminExt group[{}] has created already, specified another name please.{}",
521 self.admin_ext_group,
522 FAQUrl::suggest_todo(FAQUrl::GROUP_NAME_DUPLICATE_URL)
523 )));
524 }
525 let arc_mut = self.client_instance.clone().unwrap();
526 self.client_instance.as_mut().unwrap().start(arc_mut).await?;
527 self.service_state = ServiceState::Running;
528 info!("the adminExt [{}] start OK", self.admin_ext_group);
529 Ok(())
530 }
531 ServiceState::Running | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
532 unimplemented!()
533 }
534 }
535 }
536
537 async fn shutdown(&mut self) {
538 match self.service_state {
539 ServiceState::CreateJust | ServiceState::ShutdownAlready | ServiceState::StartFailed => {
540 }
542 ServiceState::Running => {
543 let instance = self.client_instance.as_mut().unwrap();
544 instance.unregister_admin_ext(&self.admin_ext_group).await;
545 instance.shutdown().await;
546 self.service_state = ServiceState::ShutdownAlready;
547 }
548 }
549 }
550
551 async fn add_broker_to_container(
552 &self,
553 broker_container_addr: CheetahString,
554 broker_config: CheetahString,
555 ) -> rocketmq_error::RocketMQResult<()> {
556 todo!()
557 }
558
559 async fn remove_broker_from_container(
560 &self,
561 broker_container_addr: CheetahString,
562 cluster_name: CheetahString,
563 broker_name: CheetahString,
564 broker_id: u64,
565 ) -> rocketmq_error::RocketMQResult<()> {
566 todo!()
567 }
568
569 async fn update_broker_config(
570 &self,
571 broker_addr: CheetahString,
572 properties: HashMap<CheetahString, CheetahString>,
573 ) -> rocketmq_error::RocketMQResult<()> {
574 let validator_input = properties
575 .iter()
576 .map(|(key, value)| (key.to_string(), value.to_string()))
577 .collect::<HashMap<String, String>>();
578 Validators::check_broker_config(&validator_input)?;
579
580 if let Some(ref mq_client_instance) = self.client_instance {
581 mq_client_instance
582 .get_mq_client_api_impl()
583 .update_broker_config(&broker_addr, properties, self.timeout_millis.as_millis() as u64)
584 .await
585 } else {
586 Err(rocketmq_error::RocketMQError::ClientNotStarted)
587 }
588 }
589
590 async fn get_broker_config(
591 &self,
592 broker_addr: CheetahString,
593 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, CheetahString>> {
594 if let Some(ref mq_client_instance) = self.client_instance {
595 mq_client_instance
596 .get_mq_client_api_impl()
597 .get_broker_config(&broker_addr, self.timeout_millis.as_millis() as u64)
598 .await
599 } else {
600 Err(rocketmq_error::RocketMQError::ClientNotStarted)
601 }
602 }
603
604 async fn create_and_update_topic_config(
605 &self,
606 addr: CheetahString,
607 config: TopicConfig,
608 ) -> rocketmq_error::RocketMQResult<()> {
609 let topic = config
610 .topic_name
611 .clone()
612 .ok_or_else(|| rocketmq_error::RocketMQError::IllegalArgument("Topic name is required".into()))?;
613 let attributes = encode_topic_attributes(&config.attributes);
614 let request_header = CreateTopicRequestHeader {
615 topic,
616 default_topic: CheetahString::from_static_str(TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC),
617 read_queue_nums: config.read_queue_nums as i32,
618 write_queue_nums: config.write_queue_nums as i32,
619 perm: config.perm as i32,
620 topic_filter_type: config.topic_filter_type.to_string().into(),
621 topic_sys_flag: Some(config.topic_sys_flag as i32),
622 order: config.order,
623 attributes,
624 force: Some(false),
625 topic_request_header: None,
626 };
627
628 self.client_instance
629 .as_ref()
630 .unwrap()
631 .get_mq_client_api_impl()
632 .update_or_create_topic(&addr, request_header, self.timeout_millis.as_millis() as u64)
633 .await
634 }
635
636 async fn create_and_update_topic_config_list(
637 &self,
638 addr: CheetahString,
639 topic_config_list: Vec<TopicConfig>,
640 ) -> rocketmq_error::RocketMQResult<()> {
641 for config in topic_config_list {
642 self.create_and_update_topic_config(addr.clone(), config).await?;
643 }
644 Ok(())
645 }
646
647 async fn create_and_update_plain_access_config(
648 &self,
649 addr: CheetahString,
650 config: PlainAccessConfig,
651 ) -> rocketmq_error::RocketMQResult<()> {
652 todo!()
653 }
654
655 async fn delete_plain_access_config(
656 &self,
657 addr: CheetahString,
658 access_key: CheetahString,
659 ) -> rocketmq_error::RocketMQResult<()> {
660 todo!()
661 }
662
663 async fn update_global_white_addr_config(
664 &self,
665 addr: CheetahString,
666 global_white_addrs: CheetahString,
667 acl_file_full_path: Option<CheetahString>,
668 ) -> rocketmq_error::RocketMQResult<()> {
669 todo!()
670 }
671
672 async fn examine_broker_cluster_acl_version_info(
673 &self,
674 addr: CheetahString,
675 ) -> rocketmq_error::RocketMQResult<CheetahString> {
676 todo!()
677 }
678
679 async fn create_and_update_subscription_group_config(
680 &self,
681 addr: CheetahString,
682 config: SubscriptionGroupConfig,
683 ) -> rocketmq_error::RocketMQResult<()> {
684 self.client_instance
685 .as_ref()
686 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
687 .get_mq_client_api_impl()
688 .create_subscription_group(&addr, &config, self.timeout_millis.as_millis() as u64)
689 .await
690 }
691
692 async fn create_and_update_subscription_group_config_list(
693 &self,
694 broker_addr: CheetahString,
695 configs: Vec<SubscriptionGroupConfig>,
696 ) -> rocketmq_error::RocketMQResult<()> {
697 for config in configs {
698 self.create_and_update_subscription_group_config(broker_addr.clone(), config)
699 .await?;
700 }
701 Ok(())
702 }
703
704 async fn examine_subscription_group_config(
705 &self,
706 addr: CheetahString,
707 group: CheetahString,
708 ) -> rocketmq_error::RocketMQResult<SubscriptionGroupConfig> {
709 self.client_instance
710 .as_ref()
711 .unwrap()
712 .get_mq_client_api_impl()
713 .get_subscription_group_config(&addr, group, self.timeout_millis.as_millis() as u64)
714 .await
715 }
716
717 async fn examine_topic_stats(
718 &self,
719 topic: CheetahString,
720 broker_addr: Option<CheetahString>,
721 ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
722 let timeout = self.timeout_millis.as_millis() as u64;
723 let request_header = GetTopicStatsInfoRequestHeader {
724 topic: topic.clone(),
725 topic_request_header: None,
726 };
727 if let Some(addr) = broker_addr {
728 return self
729 .client_instance
730 .as_ref()
731 .unwrap()
732 .get_mq_client_api_impl()
733 .get_topic_stats_info(&addr, request_header, timeout)
734 .await;
735 }
736
737 let topic_route = self.examine_topic_route_info(topic).await?;
738 let mut result = TopicStatsTable::new();
739 if let Some(route_data) = topic_route {
740 for broker_data in &route_data.broker_datas {
741 if let Some(master_addr) = broker_data.broker_addrs().get(&mix_all::MASTER_ID) {
742 let stats = self
743 .client_instance
744 .as_ref()
745 .unwrap()
746 .get_mq_client_api_impl()
747 .get_topic_stats_info(master_addr, request_header.clone(), timeout)
748 .await?;
749 result.get_offset_table_mut().extend(stats.into_offset_table());
750 }
751 }
752 }
753
754 Ok(result)
755 }
756
757 async fn examine_topic_stats_concurrent(&self, topic: CheetahString) -> AdminToolResult<TopicStatsTable> {
758 match self.examine_topic_stats(topic, None).await {
759 Ok(stats) => AdminToolResult::success(stats),
760 Err(error) => AdminToolResult::failure(
761 crate::common::admin_tools_result_code_enum::AdminToolsResultCodeEnum::RemotingError,
762 error.to_string(),
763 ),
764 }
765 }
766
767 async fn fetch_all_topic_list(&self) -> rocketmq_error::RocketMQResult<TopicList> {
768 self.client_instance
769 .as_ref()
770 .unwrap()
771 .get_mq_client_api_impl()
772 .get_all_topic_list_from_name_server(self.timeout_millis.as_millis() as u64)
773 .await
774 }
775
776 async fn fetch_topics_by_cluster(&self, cluster_name: CheetahString) -> rocketmq_error::RocketMQResult<TopicList> {
777 todo!()
778 }
779
780 async fn fetch_broker_runtime_stats(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<KVTable> {
781 self.client_instance
782 .as_ref()
783 .unwrap()
784 .get_mq_client_api_impl()
785 .get_broker_runtime_info(&broker_addr, self.timeout_millis.as_millis() as u64)
786 .await
787 }
788
789 async fn examine_consume_stats(
790 &self,
791 consumer_group: CheetahString,
792 topic: Option<CheetahString>,
793 cluster_name: Option<CheetahString>,
794 broker_addr: Option<CheetahString>,
795 timeout_millis: Option<u64>,
796 ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
797 let timeout = timeout_millis.unwrap_or(self.timeout_millis.as_millis() as u64);
798 let topic_str = topic.clone().unwrap_or_default();
799
800 if let Some(addr) = broker_addr {
801 let request_header = GetConsumeStatsRequestHeader {
802 consumer_group,
803 topic: topic_str,
804 topic_request_header: None,
805 };
806 return self
807 .client_instance
808 .as_ref()
809 .unwrap()
810 .get_mq_client_api_impl()
811 .get_consume_stats(&addr, request_header, timeout)
812 .await;
813 }
814
815 let retry_topic: CheetahString = rocketmq_common::common::mix_all::get_retry_topic(&consumer_group).into();
816 let topic_route = self
817 .client_instance
818 .as_ref()
819 .unwrap()
820 .mq_client_api_impl
821 .as_ref()
822 .unwrap()
823 .get_topic_route_info_from_name_server(&retry_topic, timeout)
824 .await?;
825
826 let mut result = ConsumeStats::new();
827
828 if let Some(route_data) = topic_route {
829 for bd in &route_data.broker_datas {
830 if let Some(master_addr) = bd.broker_addrs().get(&rocketmq_common::common::mix_all::MASTER_ID) {
831 let request_header = GetConsumeStatsRequestHeader {
832 consumer_group: consumer_group.clone(),
833 topic: topic_str.clone(),
834 topic_request_header: None,
835 };
836 let cs = self
837 .client_instance
838 .as_ref()
839 .unwrap()
840 .get_mq_client_api_impl()
841 .get_consume_stats(master_addr, request_header, timeout)
842 .await?;
843
844 result.get_offset_table_mut().extend(cs.offset_table);
845 let new_tps = result.get_consume_tps() + cs.consume_tps;
846 result.set_consume_tps(new_tps);
847 }
848 }
849 }
850
851 Ok(result)
852 }
853
854 async fn check_rocksdb_cq_write_progress(
855 &self,
856 broker_addr: CheetahString,
857 topic: CheetahString,
858 check_store_time: i64,
859 ) -> rocketmq_error::RocketMQResult<CheckRocksdbCqWriteResult> {
860 self.client_instance
861 .as_ref()
862 .unwrap()
863 .get_mq_client_api_impl()
864 .check_rocksdb_cq_write_progress(
865 &broker_addr,
866 topic,
867 check_store_time,
868 self.timeout_millis.as_millis() as u64,
869 )
870 .await
871 }
872
873 async fn examine_broker_cluster_info(&self) -> rocketmq_error::RocketMQResult<ClusterInfo> {
874 self.client_instance
875 .as_ref()
876 .unwrap()
877 .get_mq_client_api_impl()
878 .get_broker_cluster_info(self.timeout_millis.as_millis() as u64)
879 .await
880 }
881
882 async fn examine_topic_route_info(
883 &self,
884 topic: CheetahString,
885 ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
886 self.client_instance
887 .as_ref()
888 .unwrap()
889 .mq_client_api_impl
890 .as_ref()
891 .unwrap()
892 .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
893 .await
894 }
895
896 async fn examine_consumer_connection_info(
897 &self,
898 consumer_group: CheetahString,
899 broker_addr: Option<CheetahString>,
900 ) -> rocketmq_error::RocketMQResult<ConsumerConnection> {
901 let mut result = ConsumerConnection::new();
902 let timeout = self.timeout_millis.as_millis() as u64;
903
904 let selected_addr = if let Some(broker_addr) = broker_addr {
905 Some(broker_addr)
906 } else {
907 let topic = CheetahString::from_string(mix_all::get_retry_topic(consumer_group.as_str()));
908 let topic_route_data = self
909 .client_instance
910 .as_ref()
911 .unwrap()
912 .get_mq_client_api_impl()
913 .get_topic_route_info_from_name_server(&topic, timeout)
914 .await?;
915
916 topic_route_data.and_then(|topic_route_data| {
917 topic_route_data
918 .broker_datas
919 .choose(&mut rand::rng())
920 .and_then(|broker_data| broker_data.select_broker_addr())
921 })
922 };
923
924 if let Some(broker_addr) = selected_addr {
925 result = self
926 .client_instance
927 .as_ref()
928 .unwrap()
929 .get_mq_client_api_impl()
930 .get_consumer_connection_list(broker_addr.as_str(), consumer_group.clone(), timeout)
931 .await?;
932 }
933
934 if result.get_connection_set().is_empty() {
935 return Err(mq_client_err!(
936 rocketmq_remoting::code::response_code::ResponseCode::ConsumerNotOnline,
937 "Not found the consumer group connection"
938 ));
939 }
940
941 Ok(result)
942 }
943
944 async fn examine_producer_connection_info(
945 &self,
946 producer_group: CheetahString,
947 topic: CheetahString,
948 ) -> rocketmq_error::RocketMQResult<ProducerConnection> {
949 let mut result = ProducerConnection::new();
950 let timeout = self.timeout_millis.as_millis() as u64;
951
952 if let Some(topic_route_data) = self.examine_topic_route_info(topic).await? {
953 let brokers = &topic_route_data.broker_datas;
954 let selected_addr = brokers
955 .choose(&mut rand::rng())
956 .and_then(|broker_data| broker_data.select_broker_addr());
957 if let Some(addr) = selected_addr {
958 result = self
959 .client_instance
960 .as_ref()
961 .unwrap()
962 .get_mq_client_api_impl()
963 .get_producer_connection_list(addr.as_str(), producer_group.clone(), timeout)
964 .await?;
965 }
966 }
967
968 if result.connection_set().is_empty() {
969 return Err(mq_client_err!("Not found the producer group connection"));
970 }
971
972 Ok(result)
973 }
974
975 async fn get_all_producer_info(
976 &self,
977 broker_addr: CheetahString,
978 ) -> rocketmq_error::RocketMQResult<ProducerTableInfo> {
979 self.client_instance
980 .as_ref()
981 .unwrap()
982 .get_mq_client_api_impl()
983 .get_all_producer_info(broker_addr.as_str(), self.timeout_millis.as_millis() as u64)
984 .await
985 }
986
987 async fn get_name_server_address_list(&self) -> Vec<CheetahString> {
988 self.client_instance
989 .as_ref()
990 .unwrap()
991 .get_mq_client_api_impl()
992 .get_name_server_address_list()
993 .to_vec()
994 }
995
996 async fn wipe_write_perm_of_broker(
997 &self,
998 namesrv_addr: CheetahString,
999 broker_name: CheetahString,
1000 ) -> rocketmq_error::RocketMQResult<i32> {
1001 self.client_instance
1002 .as_ref()
1003 .unwrap()
1004 .get_mq_client_api_impl()
1005 .wipe_write_perm_of_broker(namesrv_addr, broker_name, self.timeout_millis.as_millis() as u64)
1006 .await
1007 }
1008
1009 async fn add_write_perm_of_broker(
1010 &self,
1011 namesrv_addr: CheetahString,
1012 broker_name: CheetahString,
1013 ) -> rocketmq_error::RocketMQResult<i32> {
1014 self.client_instance
1015 .as_ref()
1016 .unwrap()
1017 .get_mq_client_api_impl()
1018 .add_write_perm_of_broker(namesrv_addr, broker_name, self.timeout_millis.as_millis() as u64)
1019 .await
1020 }
1021
1022 async fn put_kv_config(&self, namespace: CheetahString, key: CheetahString, value: CheetahString) {
1023 todo!()
1024 }
1025
1026 async fn get_kv_config(
1027 &self,
1028 namespace: CheetahString,
1029 key: CheetahString,
1030 ) -> rocketmq_error::RocketMQResult<CheetahString> {
1031 Ok(self
1032 .client_instance
1033 .as_ref()
1034 .unwrap()
1035 .get_mq_client_api_impl()
1036 .get_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
1037 .await?
1038 .unwrap_or_default())
1039 }
1040
1041 async fn get_kv_list_by_namespace(&self, namespace: CheetahString) -> rocketmq_error::RocketMQResult<KVTable> {
1042 todo!()
1043 }
1044
1045 async fn delete_topic(
1046 &self,
1047 topic_name: CheetahString,
1048 cluster_name: CheetahString,
1049 ) -> rocketmq_error::RocketMQResult<()> {
1050 let cluster_info = self.examine_broker_cluster_info().await?;
1051 let mut broker_addrs = HashSet::new();
1052 if let Some(cluster_addr_table) = cluster_info.cluster_addr_table.as_ref() {
1053 if let Some(broker_names) = cluster_addr_table.get(&cluster_name) {
1054 if let Some(broker_addr_table) = cluster_info.broker_addr_table.as_ref() {
1055 for broker_name in broker_names {
1056 if let Some(broker_data) = broker_addr_table.get(broker_name) {
1057 broker_addrs.extend(broker_data.broker_addrs().values().cloned());
1058 }
1059 }
1060 }
1061 }
1062 }
1063 self.delete_topic_in_broker(broker_addrs, topic_name.clone()).await?;
1064
1065 let namesrv_addrs: HashSet<CheetahString> = self.get_name_server_address_list().await.into_iter().collect();
1066 self.delete_topic_in_name_server(namesrv_addrs, Some(cluster_name), topic_name)
1067 .await
1068 }
1069
1070 async fn delete_topic_in_broker(
1071 &self,
1072 addrs: HashSet<CheetahString>,
1073 topic: CheetahString,
1074 ) -> rocketmq_error::RocketMQResult<()> {
1075 let request_header = DeleteTopicRequestHeader {
1076 topic: topic.clone(),
1077 topic_request_header: None,
1078 };
1079 let api = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
1080 let timeout = self.timeout_millis.as_millis() as u64;
1081 for addr in addrs {
1082 api.delete_topic_in_broker(
1083 &addr,
1084 DeleteTopicRequestHeader {
1085 topic: request_header.topic.clone(),
1086 topic_request_header: None,
1087 },
1088 timeout,
1089 )
1090 .await?;
1091 }
1092 Ok(())
1093 }
1094
1095 async fn delete_topic_in_name_server(
1096 &self,
1097 addrs: HashSet<CheetahString>,
1098 cluster_name: Option<CheetahString>,
1099 topic: CheetahString,
1100 ) -> rocketmq_error::RocketMQResult<()> {
1101 let request_header = DeleteTopicFromNamesrvRequestHeader::new(topic, cluster_name);
1102 let api = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
1103 let timeout = self.timeout_millis.as_millis() as u64;
1104 for addr in addrs {
1105 api.delete_topic_in_nameserver(&addr, request_header.clone(), timeout)
1106 .await?;
1107 }
1108 Ok(())
1109 }
1110
1111 async fn delete_subscription_group(
1112 &self,
1113 addr: CheetahString,
1114 group_name: CheetahString,
1115 remove_offset: Option<bool>,
1116 ) -> rocketmq_error::RocketMQResult<()> {
1117 self.client_instance
1118 .as_ref()
1119 .unwrap()
1120 .get_mq_client_api_impl()
1121 .delete_subscription_group(
1122 &addr,
1123 group_name,
1124 remove_offset.unwrap_or(false),
1125 self.timeout_millis.as_millis() as u64,
1126 )
1127 .await
1128 }
1129
1130 async fn create_and_update_kv_config(
1131 &self,
1132 namespace: CheetahString,
1133 key: CheetahString,
1134 value: CheetahString,
1135 ) -> rocketmq_error::RocketMQResult<()> {
1136 self.client_instance
1137 .as_ref()
1138 .unwrap()
1139 .get_mq_client_api_impl()
1140 .put_kvconfig_value(namespace, key, value, self.timeout_millis.as_millis() as u64)
1141 .await
1142 }
1143
1144 async fn delete_kv_config(
1145 &self,
1146 namespace: CheetahString,
1147 key: CheetahString,
1148 ) -> rocketmq_error::RocketMQResult<()> {
1149 self.client_instance
1150 .as_ref()
1151 .unwrap()
1152 .get_mq_client_api_impl()
1153 .delete_kvconfig_value(namespace, key, self.timeout_millis.as_millis() as u64)
1154 .await
1155 }
1156
1157 async fn reset_offset_by_timestamp(
1158 &self,
1159 cluster_name: Option<CheetahString>,
1160 topic: CheetahString,
1161 group: CheetahString,
1162 timestamp: u64,
1163 is_force: bool,
1164 ) -> rocketmq_error::RocketMQResult<HashMap<MessageQueue, u64>> {
1165 let topic_route = self.examine_topic_route_info(topic.clone()).await?;
1166 let mut offset_table = HashMap::new();
1167 let timeout = self.timeout_millis.as_millis() as u64;
1168
1169 if let Some(route_data) = topic_route {
1170 for broker_data in &route_data.broker_datas {
1171 if let Some(expected_cluster) = cluster_name.as_ref() {
1172 if broker_data.cluster() != expected_cluster {
1173 continue;
1174 }
1175 }
1176 if let Some(master_addr) = broker_data.broker_addrs().get(&mix_all::MASTER_ID) {
1177 let request_header = ResetOffsetRequestHeader {
1178 topic: topic.clone(),
1179 group: group.clone(),
1180 queue_id: -1,
1181 offset: None,
1182 timestamp: timestamp as i64,
1183 is_force,
1184 topic_request_header: None,
1185 };
1186 let offsets = self
1187 .client_instance
1188 .as_ref()
1189 .unwrap()
1190 .get_mq_client_api_impl()
1191 .invoke_broker_to_reset_offset(master_addr, request_header, timeout)
1192 .await?;
1193 offset_table.extend(offsets.into_iter().map(|(mq, offset)| (mq, offset as u64)));
1194 }
1195 }
1196 }
1197
1198 Ok(offset_table)
1199 }
1200
1201 async fn reset_offset_new(
1202 &self,
1203 consumer_group: CheetahString,
1204 topic: CheetahString,
1205 timestamp: u64,
1206 ) -> rocketmq_error::RocketMQResult<()> {
1207 todo!()
1208 }
1209
1210 async fn get_consume_status(
1211 &self,
1212 topic: CheetahString,
1213 group: CheetahString,
1214 client_addr: CheetahString,
1215 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<MessageQueue, u64>>> {
1216 let topic_route_data = self.examine_topic_route_info(topic.clone()).await?;
1217 if let Some(route_data) = topic_route_data {
1218 if !route_data.broker_datas.is_empty() {
1219 if let Some(addr) = route_data.broker_datas[0].select_broker_addr() {
1220 let result = self
1221 .client_instance
1222 .as_ref()
1223 .unwrap()
1224 .get_mq_client_api_impl()
1225 .invoke_broker_to_get_consumer_status(
1226 addr.as_str(),
1227 topic,
1228 group,
1229 client_addr,
1230 self.timeout_millis.as_millis() as u64,
1231 )
1232 .await?;
1233 let converted: HashMap<CheetahString, HashMap<MessageQueue, u64>> = result
1234 .into_iter()
1235 .map(|(k, v)| {
1236 let inner: HashMap<MessageQueue, u64> =
1237 v.into_iter().map(|(mq, off)| (mq, off as u64)).collect();
1238 (k, inner)
1239 })
1240 .collect();
1241 return Ok(converted);
1242 }
1243 }
1244 }
1245 Ok(HashMap::new())
1246 }
1247
1248 async fn create_or_update_order_conf(
1249 &self,
1250 key: CheetahString,
1251 value: CheetahString,
1252 is_cluster: bool,
1253 ) -> rocketmq_error::RocketMQResult<()> {
1254 if is_cluster {
1255 return self
1256 .client_instance
1257 .as_ref()
1258 .unwrap()
1259 .get_mq_client_api_impl()
1260 .put_kvconfig_value(
1261 CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
1262 key,
1263 value,
1264 self.timeout_millis.as_millis() as u64,
1265 )
1266 .await;
1267 }
1268
1269 let existing = self
1270 .client_instance
1271 .as_ref()
1272 .unwrap()
1273 .get_mq_client_api_impl()
1274 .get_kvconfig_value(
1275 CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
1276 key.clone(),
1277 self.timeout_millis.as_millis() as u64,
1278 )
1279 .await?
1280 .unwrap_or_default();
1281
1282 let merged_order_conf = merge_order_conf_entries(existing.as_str(), value.as_str());
1283
1284 self.client_instance
1285 .as_ref()
1286 .unwrap()
1287 .get_mq_client_api_impl()
1288 .put_kvconfig_value(
1289 CheetahString::from_static_str(NAMESPACE_ORDER_TOPIC_CONFIG),
1290 key,
1291 merged_order_conf.into(),
1292 self.timeout_millis.as_millis() as u64,
1293 )
1294 .await
1295 }
1296
1297 async fn query_topic_consume_by_who(&self, topic: CheetahString) -> rocketmq_error::RocketMQResult<GroupList> {
1298 let topic_route = self
1299 .client_instance
1300 .as_ref()
1301 .unwrap()
1302 .mq_client_api_impl
1303 .as_ref()
1304 .unwrap()
1305 .get_topic_route_info_from_name_server(&topic, self.timeout_millis.as_millis() as u64)
1306 .await?;
1307
1308 if let Some(route_data) = topic_route {
1309 for bd in &route_data.broker_datas {
1310 if let Some(master_addr) = bd.broker_addrs().get(&rocketmq_common::common::mix_all::MASTER_ID) {
1311 let request_header = QueryTopicConsumeByWhoRequestHeader {
1312 topic: topic.clone(),
1313 topic_request_header: None,
1314 };
1315 return self
1316 .client_instance
1317 .as_ref()
1318 .unwrap()
1319 .get_mq_client_api_impl()
1320 .query_topic_consume_by_who(master_addr, request_header, self.timeout_millis.as_millis() as u64)
1321 .await;
1322 }
1323 }
1324 }
1325
1326 Ok(GroupList::default())
1327 }
1328
1329 async fn query_topics_by_consumer(&self, group: CheetahString) -> rocketmq_error::RocketMQResult<TopicList> {
1330 todo!()
1331 }
1332
1333 async fn query_topics_by_consumer_concurrent(&self, group: CheetahString) -> AdminToolResult<TopicList> {
1334 todo!()
1335 }
1336
1337 async fn query_subscription(
1338 &self,
1339 group: CheetahString,
1340 topic: CheetahString,
1341 ) -> rocketmq_error::RocketMQResult<SubscriptionData> {
1342 todo!()
1343 }
1344
1345 async fn clean_expired_consumer_queue(
1346 &self,
1347 cluster: Option<CheetahString>,
1348 addr: Option<CheetahString>,
1349 ) -> rocketmq_error::RocketMQResult<bool> {
1350 todo!()
1351 }
1352
1353 async fn delete_expired_commit_log(
1354 &self,
1355 cluster: Option<CheetahString>,
1356 addr: Option<CheetahString>,
1357 ) -> rocketmq_error::RocketMQResult<bool> {
1358 todo!()
1359 }
1360
1361 async fn clean_unused_topic(
1362 &self,
1363 cluster: Option<CheetahString>,
1364 addr: Option<CheetahString>,
1365 ) -> rocketmq_error::RocketMQResult<bool> {
1366 todo!()
1367 }
1368
1369 async fn get_consumer_running_info(
1370 &self,
1371 consumer_group: CheetahString,
1372 client_id: CheetahString,
1373 jstack: bool,
1374 _metrics: Option<bool>,
1375 ) -> rocketmq_error::RocketMQResult<ConsumerRunningInfo> {
1376 let broker_addr = self
1377 .examine_consumer_connection_info(consumer_group.clone(), None)
1378 .await?
1379 .get_connection_set()
1380 .iter()
1381 .find(|connection| connection.get_client_id() == client_id)
1382 .map(|connection| connection.get_client_addr().clone())
1383 .ok_or_else(|| {
1384 rocketmq_error::RocketMQError::IllegalArgument(format!(
1385 "Client `{}` was not found in consumer group `{}`",
1386 client_id, consumer_group
1387 ))
1388 })?;
1389
1390 self.client_instance
1391 .as_ref()
1392 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
1393 .get_mq_client_api_impl()
1394 .get_consumer_running_info(
1395 &broker_addr,
1396 consumer_group,
1397 client_id,
1398 jstack,
1399 self.timeout_millis.as_millis() as u64,
1400 )
1401 .await
1402 }
1403
1404 async fn consume_message_directly(
1405 &self,
1406 consumer_group: CheetahString,
1407 client_id: CheetahString,
1408 topic: CheetahString,
1409 msg_id: CheetahString,
1410 ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
1411 let consumer_connection = self
1412 .examine_consumer_connection_info(consumer_group.clone(), None)
1413 .await?;
1414 let (resolved_client_id, client_addr) =
1415 select_consumer_direct_connection(&consumer_group, &consumer_connection, Some(&client_id))?;
1416 let message = MQAdminExt::query_message(self, CheetahString::default(), topic.clone(), msg_id.clone()).await?;
1417 let request_header = ConsumeMessageDirectlyResultRequestHeader {
1418 consumer_group,
1419 client_id: Some(resolved_client_id),
1420 msg_id: Some(msg_id),
1421 broker_name: (!message.broker_name().is_empty()).then(|| message.broker_name.clone()),
1422 topic: Some(topic),
1423 topic_sys_flag: None,
1424 group_sys_flag: None,
1425 topic_request_header: None,
1426 };
1427
1428 self.client_instance
1429 .as_ref()
1430 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
1431 .get_mq_client_api_impl()
1432 .consume_message_directly(
1433 &client_addr,
1434 request_header,
1435 &message,
1436 self.timeout_millis.as_millis() as u64,
1437 )
1438 .await
1439 }
1440
1441 async fn consume_message_directly_ext(
1442 &self,
1443 _cluster_name: CheetahString,
1444 consumer_group: CheetahString,
1445 client_id: CheetahString,
1446 topic: CheetahString,
1447 msg_id: CheetahString,
1448 ) -> rocketmq_error::RocketMQResult<ConsumeMessageDirectlyResult> {
1449 self.consume_message_directly(consumer_group, client_id, topic, msg_id)
1450 .await
1451 }
1452
1453 async fn clone_group_offset(
1454 &self,
1455 src_group: CheetahString,
1456 dest_group: CheetahString,
1457 topic: CheetahString,
1458 is_offline: bool,
1459 ) -> rocketmq_error::RocketMQResult<()> {
1460 todo!()
1461 }
1462
1463 async fn get_cluster_list(&self, topic: String) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
1464 todo!()
1465 }
1466
1467 async fn get_topic_cluster_list(&self, topic: String) -> rocketmq_error::RocketMQResult<HashSet<CheetahString>> {
1468 let cluster_info = self.examine_broker_cluster_info().await?;
1469 let topic_route_data = self.examine_topic_route_info(topic.into()).await?.unwrap();
1470 let broker_data = topic_route_data
1471 .broker_datas
1472 .first()
1473 .ok_or_else(|| mq_client_err!("Broker datas is empty"))?;
1474 let mut cluster_set = HashSet::new();
1475 let broker_name = broker_data.broker_name();
1476 if let Some(cluster_addr_table) = cluster_info.cluster_addr_table.as_ref() {
1477 cluster_set.extend(
1478 cluster_addr_table
1479 .iter()
1480 .filter(|(cluster_name, broker_names)| broker_names.contains(broker_name))
1481 .map(|(cluster_name, broker_names)| cluster_name.clone()),
1482 );
1483 }
1484 Ok(cluster_set)
1485 }
1486
1487 async fn get_all_topic_config(
1488 &self,
1489 broker_addr: CheetahString,
1490 timeout_millis: u64,
1491 ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
1492 self.client_instance
1493 .as_ref()
1494 .unwrap()
1495 .get_mq_client_api_impl()
1496 .get_all_topic_config(&broker_addr, timeout_millis)
1497 .await
1498 }
1499
1500 async fn get_user_topic_config(
1501 &self,
1502 broker_addr: CheetahString,
1503 special_topic: bool,
1504 timeout_millis: u64,
1505 ) -> rocketmq_error::RocketMQResult<TopicConfigSerializeWrapper> {
1506 let mut topic_config_wrapper = self.get_all_topic_config(broker_addr, timeout_millis).await?;
1507
1508 if let Some(ref mut topic_table) = topic_config_wrapper.topic_config_table_mut() {
1509 topic_table.retain(|topic_name, topic_config| {
1510 if TopicValidator::is_system_topic(topic_name.as_str()) {
1511 return false;
1512 }
1513 if !special_topic
1514 && (topic_name.starts_with(RETRY_GROUP_TOPIC_PREFIX)
1515 || topic_name.starts_with(DLQ_GROUP_TOPIC_PREFIX))
1516 {
1517 return false;
1518 }
1519 if !PermName::is_valid(topic_config.perm) {
1520 return false;
1521 }
1522 true
1523 });
1524 }
1525
1526 Ok(topic_config_wrapper)
1527 }
1528
1529 async fn update_consume_offset(
1530 &self,
1531 broker_addr: CheetahString,
1532 consume_group: CheetahString,
1533 mq: MessageQueue,
1534 offset: u64,
1535 ) -> rocketmq_error::RocketMQResult<()> {
1536 todo!()
1537 }
1538
1539 async fn update_name_server_config(
1540 &self,
1541 properties: HashMap<CheetahString, CheetahString>,
1542 name_servers: Option<Vec<CheetahString>>,
1543 ) -> rocketmq_error::RocketMQResult<()> {
1544 self.client_instance
1545 .as_ref()
1546 .unwrap()
1547 .get_mq_client_api_impl()
1548 .update_name_server_config(properties, name_servers, self.timeout_millis.as_millis() as u64)
1549 .await
1550 }
1551
1552 async fn get_name_server_config(
1553 &self,
1554 name_servers: Vec<CheetahString>,
1555 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>> {
1556 Ok(self
1557 .client_instance
1558 .as_ref()
1559 .unwrap()
1560 .mq_client_api_impl
1561 .as_ref()
1562 .unwrap()
1563 .get_name_server_config(Some(name_servers), self.timeout_millis)
1564 .await?
1565 .unwrap_or_default())
1566 }
1567
1568 async fn probe_name_server(&self, name_server: CheetahString) -> rocketmq_error::RocketMQResult<()> {
1569 self.client_instance
1570 .as_ref()
1571 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?
1572 .get_mq_client_api_impl()
1573 .probe_name_server(&name_server, self.timeout_millis)
1574 .await
1575 }
1576
1577 async fn resume_check_half_message(
1578 &self,
1579 topic: CheetahString,
1580 msg_id: CheetahString,
1581 ) -> rocketmq_error::RocketMQResult<bool> {
1582 todo!()
1583 }
1584
1585 async fn set_message_request_mode(
1586 &self,
1587 broker_addr: CheetahString,
1588 topic: CheetahString,
1589 consumer_group: CheetahString,
1590 mode: MessageRequestMode,
1591 pop_work_group_size: i32,
1592 timeout_millis: u64,
1593 ) -> rocketmq_error::RocketMQResult<()> {
1594 let mut mq_client_api = self.client_instance.as_ref().unwrap().get_mq_client_api_impl();
1595 match mq_client_api
1596 .set_message_request_mode(
1597 &broker_addr,
1598 &topic,
1599 &consumer_group,
1600 mode,
1601 pop_work_group_size,
1602 timeout_millis,
1603 )
1604 .await
1605 {
1606 Ok(_) => Ok(()),
1607 Err(e) => Err(e),
1608 }
1609 }
1610
1611 async fn reset_offset_by_queue_id(
1612 &self,
1613 broker_addr: CheetahString,
1614 consumer_group: CheetahString,
1615 topic_name: CheetahString,
1616 queue_id: i32,
1617 reset_offset: u64,
1618 ) -> rocketmq_error::RocketMQResult<()> {
1619 todo!()
1620 }
1621
1622 async fn examine_topic_config(
1623 &self,
1624 addr: CheetahString,
1625 topic: CheetahString,
1626 ) -> rocketmq_error::RocketMQResult<TopicConfig> {
1627 let request_header = GetTopicConfigRequestHeader {
1628 topic,
1629 topic_request_header: None,
1630 };
1631 let mapping: TopicConfigAndQueueMapping = self
1632 .client_instance
1633 .as_ref()
1634 .unwrap()
1635 .get_mq_client_api_impl()
1636 .get_topic_config(&addr, request_header, self.timeout_millis.as_millis() as u64)
1637 .await?;
1638
1639 Ok(mapping.topic_config)
1640 }
1641
1642 async fn create_static_topic(
1643 &self,
1644 addr: CheetahString,
1645 default_topic: CheetahString,
1646 topic_config: TopicConfig,
1647 mapping_detail: TopicQueueMappingDetail,
1648 force: bool,
1649 ) -> rocketmq_error::RocketMQResult<()> {
1650 todo!()
1651 }
1652
1653 async fn get_controller_meta_data(
1654 &self,
1655 controller_addr: CheetahString,
1656 ) -> rocketmq_error::RocketMQResult<GetMetaDataResponseHeader> {
1657 if let Some(ref mq_client_instance) = self.client_instance {
1658 Ok(mq_client_instance
1659 .get_mq_client_api_impl()
1660 .get_controller_metadata(controller_addr, self.timeout_millis.as_millis() as u64)
1661 .await?)
1662 } else {
1663 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1664 }
1665 }
1666
1667 async fn reset_master_flush_offset(
1668 &self,
1669 broker_addr: CheetahString,
1670 master_flush_offset: u64,
1671 ) -> rocketmq_error::RocketMQResult<()> {
1672 if let Some(ref mq_client_instance) = self.client_instance {
1673 mq_client_instance
1674 .get_mq_client_api_impl()
1675 .reset_master_flush_offset(&broker_addr, master_flush_offset as i64)
1676 .await
1677 } else {
1678 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1679 }
1680 }
1681
1682 async fn get_controller_config(
1683 &self,
1684 controller_servers: Vec<CheetahString>,
1685 ) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>> {
1686 if let Some(ref mq_client_instance) = self.client_instance {
1687 let mut result: HashMap<CheetahString, HashMap<CheetahString, CheetahString>> = HashMap::new();
1688 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1689 let timeout_millis = self.timeout_millis.as_millis() as u64;
1690
1691 for controller_addr in controller_servers {
1692 match mq_client_api
1693 .get_controller_config(controller_addr.clone(), timeout_millis)
1694 .await
1695 {
1696 Ok(config) => {
1697 result.insert(controller_addr, config);
1698 }
1699 Err(e) => {
1700 eprintln!("Failed to get config from controller {}: {}", controller_addr, e);
1701 }
1702 }
1703 }
1704
1705 Ok(result)
1706 } else {
1707 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1708 }
1709 }
1710
1711 async fn update_controller_config(
1712 &self,
1713 properties: HashMap<CheetahString, CheetahString>,
1714 controllers: Vec<CheetahString>,
1715 ) -> rocketmq_error::RocketMQResult<()> {
1716 todo!()
1717 }
1718
1719 async fn clean_controller_broker_data(
1720 &self,
1721 controller_addr: CheetahString,
1722 cluster_name: CheetahString,
1723 broker_name: CheetahString,
1724 broker_controller_ids_to_clean: Option<CheetahString>,
1725 is_clean_living_broker: bool,
1726 ) -> rocketmq_error::RocketMQResult<()> {
1727 todo!()
1728 }
1729
1730 async fn update_cold_data_flow_ctr_group_config(
1731 &self,
1732 broker_addr: CheetahString,
1733 properties: HashMap<CheetahString, CheetahString>,
1734 ) -> rocketmq_error::RocketMQResult<()> {
1735 self.client_instance
1736 .as_ref()
1737 .unwrap()
1738 .get_mq_client_api_impl()
1739 .update_cold_data_flow_ctr_group_config(broker_addr, properties, self.timeout_millis.as_millis() as u64)
1740 .await
1741 }
1742
1743 async fn remove_cold_data_flow_ctr_group_config(
1744 &self,
1745 broker_addr: CheetahString,
1746 consumer_group: CheetahString,
1747 ) -> rocketmq_error::RocketMQResult<()> {
1748 todo!()
1749 }
1750
1751 async fn get_cold_data_flow_ctr_info(
1752 &self,
1753 broker_addr: CheetahString,
1754 ) -> rocketmq_error::RocketMQResult<CheetahString> {
1755 todo!()
1756 }
1757
1758 async fn set_commit_log_read_ahead_mode(
1759 &self,
1760 broker_addr: CheetahString,
1761 mode: CheetahString,
1762 ) -> rocketmq_error::RocketMQResult<CheetahString> {
1763 todo!()
1764 }
1765
1766 async fn create_user(
1767 &self,
1768 broker_addr: CheetahString,
1769 username: CheetahString,
1770 password: CheetahString,
1771 user_type: CheetahString,
1772 ) -> rocketmq_error::RocketMQResult<()> {
1773 let user_info = UserInfo {
1774 username: Some(username),
1775 user_type: Some(user_type),
1776 password: Some(password),
1777 user_status: None,
1778 };
1779
1780 if let Some(ref mq_client_instance) = self.client_instance {
1781 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1782 let timeout_millis = self.timeout_millis.as_millis() as u64;
1783 mq_client_api
1784 .create_user(broker_addr, &user_info, timeout_millis)
1785 .await?;
1786 Ok(())
1787 } else {
1788 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1789 }
1790 }
1791
1792 async fn update_user(
1793 &self,
1794 broker_addr: CheetahString,
1795 username: CheetahString,
1796 password: CheetahString,
1797 user_type: CheetahString,
1798 user_status: CheetahString,
1799 ) -> rocketmq_error::RocketMQResult<()> {
1800 let mut user_info = UserInfo {
1801 username: Some(username),
1802 user_type: Some(user_type),
1803 password: Some(password),
1804 user_status: Some(user_status),
1805 };
1806
1807 if let Some(ref mq_client_instance) = self.client_instance {
1808 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1809 let timeout_millis = self.timeout_millis.as_millis() as u64;
1810 mq_client_api
1811 .update_user(broker_addr, &user_info, timeout_millis)
1812 .await?;
1813 Ok(())
1814 } else {
1815 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1816 }
1817 }
1818
1819 async fn delete_user(
1820 &self,
1821 broker_addr: CheetahString,
1822 username: CheetahString,
1823 ) -> rocketmq_error::RocketMQResult<()> {
1824 if let Some(ref mq_client_instance) = self.client_instance {
1825 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
1826 let timeout_millis = self.timeout_millis.as_millis() as u64;
1827 mq_client_api.delete_user(broker_addr, username, timeout_millis).await?;
1828 Ok(())
1829 } else {
1830 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1831 }
1832 }
1833
1834 async fn create_acl(
1835 &self,
1836 broker_addr: CheetahString,
1837 subject: CheetahString,
1838 resources: Vec<CheetahString>,
1839 actions: Vec<CheetahString>,
1840 source_ips: Vec<CheetahString>,
1841 decision: CheetahString,
1842 ) -> rocketmq_error::RocketMQResult<()> {
1843 todo!()
1844 }
1845
1846 async fn update_acl(
1847 &self,
1848 broker_addr: CheetahString,
1849 subject: CheetahString,
1850 resources: Vec<CheetahString>,
1851 actions: Vec<CheetahString>,
1852 source_ips: Vec<CheetahString>,
1853 decision: CheetahString,
1854 ) -> rocketmq_error::RocketMQResult<()> {
1855 todo!()
1856 }
1857
1858 async fn delete_acl(
1859 &self,
1860 broker_addr: CheetahString,
1861 subject: CheetahString,
1862 resource: CheetahString,
1863 ) -> rocketmq_error::RocketMQResult<()> {
1864 if let Some(ref client_instance) = self.client_instance {
1865 let mq_client_api = client_instance.get_mq_client_api_impl();
1866 mq_client_api
1867 .delete_acl(broker_addr, subject, resource, self.timeout_millis.as_millis() as u64)
1868 .await
1869 } else {
1870 Err(rocketmq_error::RocketMQError::ClientNotStarted)
1871 }
1872 }
1873
1874 async fn create_lite_pull_topic(
1875 &self,
1876 _addr: CheetahString,
1877 _topic: CheetahString,
1878 _queue_num: i32,
1879 _topic_sys_flag: i32,
1880 _read_queue_nums: i32,
1881 _write_queue_nums: i32,
1882 ) -> rocketmq_error::RocketMQResult<()> {
1883 unimplemented!("create_lite_pull_topic not implemented yet")
1884 }
1885
1886 async fn update_lite_pull_topic(
1887 &self,
1888 _addr: CheetahString,
1889 _topic: CheetahString,
1890 _read_queue_nums: i32,
1891 _write_queue_nums: i32,
1892 ) -> rocketmq_error::RocketMQResult<()> {
1893 unimplemented!("update_lite_pull_topic not implemented yet")
1894 }
1895
1896 async fn get_lite_pull_topic(
1897 &self,
1898 _addr: CheetahString,
1899 _topic: CheetahString,
1900 ) -> rocketmq_error::RocketMQResult<TopicConfig> {
1901 unimplemented!("get_lite_pull_topic not implemented yet")
1902 }
1903
1904 async fn delete_lite_pull_topic(
1905 &self,
1906 _addr: CheetahString,
1907 _cluster_name: CheetahString,
1908 _topic: CheetahString,
1909 ) -> rocketmq_error::RocketMQResult<()> {
1910 unimplemented!("delete_lite_pull_topic not implemented yet")
1911 }
1912
1913 async fn query_lite_pull_topic_list(&self, _addr: CheetahString) -> rocketmq_error::RocketMQResult<TopicList> {
1914 unimplemented!("query_lite_pull_topic_list not implemented yet")
1915 }
1916
1917 async fn query_lite_pull_topic_by_cluster(
1918 &self,
1919 _cluster_name: CheetahString,
1920 ) -> rocketmq_error::RocketMQResult<TopicList> {
1921 unimplemented!("query_lite_pull_topic_by_cluster not implemented yet")
1922 }
1923
1924 async fn query_lite_pull_subscription_list(
1925 &self,
1926 _addr: CheetahString,
1927 _topic: CheetahString,
1928 ) -> rocketmq_error::RocketMQResult<GroupList> {
1929 unimplemented!("query_lite_pull_subscription_list not implemented yet")
1930 }
1931
1932 async fn update_lite_pull_consumer_offset(
1933 &self,
1934 _addr: CheetahString,
1935 _topic: CheetahString,
1936 _group: CheetahString,
1937 _queue_id: i32,
1938 _offset: u64,
1939 ) -> rocketmq_error::RocketMQResult<()> {
1940 unimplemented!("update_lite_pull_consumer_offset not implemented yet")
1941 }
1942
1943 async fn examine_consume_stats_with_queue(
1944 &self,
1945 _consumer_group: CheetahString,
1946 _topic: Option<CheetahString>,
1947 _queue_id: Option<i32>,
1948 ) -> rocketmq_error::RocketMQResult<ConsumeStats> {
1949 unimplemented!("examine_consume_stats_with_queue not implemented yet")
1950 }
1951
1952 async fn examine_consume_stats_concurrent(
1953 &self,
1954 _consumer_group: CheetahString,
1955 _topic: Option<CheetahString>,
1956 ) -> AdminToolResult<ConsumeStats> {
1957 unimplemented!("examine_consume_stats_concurrent not implemented yet")
1958 }
1959
1960 async fn examine_consume_stats_concurrent_with_cluster(
1961 &self,
1962 _consumer_group: CheetahString,
1963 _topic: Option<CheetahString>,
1964 _cluster_name: Option<CheetahString>,
1965 ) -> AdminToolResult<ConsumeStats> {
1966 unimplemented!("examine_consume_stats_concurrent_with_cluster not implemented yet")
1967 }
1968
1969 async fn export_rocksdb_consumer_offset_to_json(
1970 &self,
1971 _broker_addr: CheetahString,
1972 _file_path: CheetahString,
1973 ) -> rocketmq_error::RocketMQResult<()> {
1974 unimplemented!("export_rocksdb_consumer_offset_to_json not implemented yet")
1975 }
1976
1977 async fn export_rocksdb_consumer_offset_from_memory(
1978 &self,
1979 _broker_addr: CheetahString,
1980 ) -> rocketmq_error::RocketMQResult<CheetahString> {
1981 unimplemented!("export_rocksdb_consumer_offset_from_memory not implemented yet")
1982 }
1983
1984 async fn sync_broker_member_group(
1985 &self,
1986 _controller_addr: CheetahString,
1987 _cluster_name: CheetahString,
1988 _broker_name: CheetahString,
1989 ) -> rocketmq_error::RocketMQResult<()> {
1990 unimplemented!("sync_broker_member_group not implemented yet")
1991 }
1992
1993 async fn get_topic_config_by_topic_name(
1994 &self,
1995 _broker_addr: CheetahString,
1996 _topic_name: CheetahString,
1997 ) -> rocketmq_error::RocketMQResult<TopicConfig> {
1998 unimplemented!("get_topic_config_by_topic_name not implemented yet")
1999 }
2000
2001 async fn notify_min_broker_id_changed(
2002 &self,
2003 _cluster_name: CheetahString,
2004 _broker_name: CheetahString,
2005 _min_broker_id: u64,
2006 _min_broker_addr: CheetahString,
2007 _offline_broker_addr: Option<CheetahString>,
2008 _ha_broker_addr: Option<CheetahString>,
2009 ) -> rocketmq_error::RocketMQResult<()> {
2010 unimplemented!("notify_min_broker_id_changed not implemented yet")
2011 }
2012
2013 async fn get_topic_stats_info(
2014 &self,
2015 _broker_addr: CheetahString,
2016 _topic: CheetahString,
2017 ) -> rocketmq_error::RocketMQResult<TopicStatsTable> {
2018 unimplemented!("get_topic_stats_info not implemented yet")
2019 }
2020
2021 async fn query_broker_has_topic(
2022 &self,
2023 _broker_addr: CheetahString,
2024 _topic: CheetahString,
2025 ) -> rocketmq_error::RocketMQResult<bool> {
2026 unimplemented!("query_broker_has_topic not implemented yet")
2027 }
2028
2029 async fn get_system_topic_list_from_broker(
2030 &self,
2031 _broker_addr: CheetahString,
2032 ) -> rocketmq_error::RocketMQResult<TopicList> {
2033 unimplemented!("get_system_topic_list_from_broker not implemented yet")
2034 }
2035
2036 async fn examine_topic_route_info_with_timeout(
2037 &self,
2038 _topic: CheetahString,
2039 _timeout_millis: u64,
2040 ) -> rocketmq_error::RocketMQResult<Option<TopicRouteData>> {
2041 unimplemented!("examine_topic_route_info_with_timeout not implemented yet")
2042 }
2043
2044 async fn export_pop_records(
2045 &self,
2046 _broker_addr: CheetahString,
2047 _timeout: u64,
2048 ) -> rocketmq_error::RocketMQResult<()> {
2049 unimplemented!("export_pop_records not implemented yet")
2050 }
2051
2052 async fn switch_timer_engine(
2053 &self,
2054 _broker_addr: CheetahString,
2055 _des_timer_engine: CheetahString,
2056 ) -> rocketmq_error::RocketMQResult<()> {
2057 unimplemented!("switch_timer_engine not implemented yet")
2058 }
2059
2060 async fn trigger_lite_dispatch(
2061 &self,
2062 _broker_addr: CheetahString,
2063 _group: CheetahString,
2064 _client_id: CheetahString,
2065 ) -> rocketmq_error::RocketMQResult<()> {
2066 unimplemented!("trigger_lite_dispatch not implemented yet")
2067 }
2068 #[allow(deprecated)]
2069 async fn delete_topic_in_broker_concurrent(
2070 &self,
2071 _addrs: HashSet<CheetahString>,
2072 _topic: CheetahString,
2073 ) -> AdminToolResult<BrokerOperatorResult> {
2074 unimplemented!("delete_topic_in_broker_concurrent not implemented yet")
2075 }
2076
2077 async fn reset_offset_by_timestamp_old(
2078 &self,
2079 cluster_name: Option<CheetahString>,
2080 consumer_group: CheetahString,
2081 topic: CheetahString,
2082 timestamp: u64,
2083 force: bool,
2084 ) -> rocketmq_error::RocketMQResult<Vec<RollbackStats>> {
2085 let mut route_topic = topic.clone();
2086 if !topic.is_empty()
2087 && (mix_all::is_lmq(Some(topic.as_str()))
2088 || topic.as_str() == format!("{}wheel_timer", TopicValidator::SYSTEM_TOPIC_PREFIX))
2089 && cluster_name.as_ref().is_some_and(|name| !name.is_empty())
2090 {
2091 route_topic = cluster_name.unwrap();
2092 }
2093 let topic_route_data = self.examine_topic_route_info(route_topic).await?;
2094 let mut rollback_stats_list = Vec::new();
2095
2096 if let Some(route_data) = topic_route_data {
2097 let mut topic_route_map = HashMap::new();
2098 for queue_data in &route_data.queue_datas {
2099 topic_route_map.insert(queue_data.broker_name().to_string(), queue_data.clone());
2100 }
2101
2102 for broker_data in &route_data.broker_datas {
2103 if let Some(addr) = broker_data.select_broker_addr() {
2104 if let Some(queue_data) = topic_route_map.get(broker_data.broker_name().as_str()) {
2105 let mut rollback_stats = self
2106 .reset_offset_by_timestamp_old_on_broker(
2107 addr,
2108 queue_data,
2109 consumer_group.clone(),
2110 topic.clone(),
2111 timestamp as i64,
2112 force,
2113 )
2114 .await?;
2115 rollback_stats_list.append(&mut rollback_stats);
2116 }
2117 }
2118 }
2119 }
2120
2121 Ok(rollback_stats_list)
2122 }
2123 #[allow(deprecated)]
2124 async fn reset_offset_new_concurrent(
2125 &self,
2126 _group: CheetahString,
2127 _topic: CheetahString,
2128 _timestamp: u64,
2129 ) -> AdminToolResult<BrokerOperatorResult> {
2130 unimplemented!("reset_offset_new_concurrent not implemented yet")
2131 }
2132
2133 async fn query_consume_time_span(
2134 &self,
2135 _topic: CheetahString,
2136 _group: CheetahString,
2137 ) -> rocketmq_error::RocketMQResult<Vec<QueueTimeSpan>> {
2138 unimplemented!("query_consume_time_span not implemented yet")
2139 }
2140
2141 async fn query_consume_time_span_concurrent(
2142 &self,
2143 _topic: CheetahString,
2144 _group: CheetahString,
2145 ) -> AdminToolResult<Vec<QueueTimeSpan>> {
2146 unimplemented!("query_consume_time_span_concurrent not implemented yet")
2147 }
2148 #[allow(deprecated)]
2149 async fn message_track_detail(&self, msg: MessageExt) -> rocketmq_error::RocketMQResult<Vec<MessageTrack>> {
2150 let group_list = self.query_topic_consume_by_who(msg.topic().clone()).await?;
2151 let mut result = Vec::with_capacity(group_list.get_group_list().len());
2152
2153 for group in group_list.get_group_list() {
2154 let mut track = build_message_track(group.as_str());
2155 let consumer_connection = match self.examine_consumer_connection_info(group.clone(), None).await {
2156 Ok(connection) => connection,
2157 Err(error) => {
2158 apply_track_error(&mut track, &error);
2159 result.push(track);
2160 continue;
2161 }
2162 };
2163
2164 match consumer_connection.get_consume_type() {
2165 Some(ConsumeType::ConsumeActively) => {
2166 track.set_track_type(TrackType::Pull);
2167 }
2168 Some(ConsumeType::ConsumePassively) => {
2169 if consumer_connection.get_message_model() == Some(MessageModel::Broadcasting) {
2170 track.set_track_type(TrackType::ConsumeBroadcasting);
2171 result.push(track);
2172 continue;
2173 }
2174
2175 let consumed = match self.message_consumed_by_group(&msg, group).await {
2176 Ok(consumed) => consumed,
2177 Err(error) => {
2178 apply_track_error(&mut track, &error);
2179 result.push(track);
2180 continue;
2181 }
2182 };
2183
2184 if consumed {
2185 track.set_track_type(resolve_consumed_track_type(&msg, &consumer_connection));
2186 } else {
2187 track.set_track_type(TrackType::NotConsumedYet);
2188 }
2189 }
2190 _ => {}
2191 }
2192
2193 result.push(track);
2194 }
2195
2196 result.sort_by(|left, right| left.consumer_group.cmp(&right.consumer_group));
2197 Ok(result)
2198 }
2199 #[allow(deprecated)]
2200 async fn message_track_detail_concurrent(&self, msg: MessageExt) -> AdminToolResult<Vec<MessageTrack>> {
2201 match self.message_track_detail(msg).await {
2202 Ok(data) => AdminToolResult::success(data),
2203 Err(error) => AdminToolResult::failure(admin_result_code_for_error(&error), error.to_string()),
2204 }
2205 }
2206
2207 async fn view_broker_stats_data(
2208 &self,
2209 broker_addr: CheetahString,
2210 stats_name: CheetahString,
2211 stats_key: CheetahString,
2212 ) -> rocketmq_error::RocketMQResult<BrokerStatsData> {
2213 let request_header = ViewBrokerStatsDataRequestHeader { stats_name, stats_key };
2214 self.client_instance
2215 .as_ref()
2216 .unwrap()
2217 .get_mq_client_api_impl()
2218 .view_broker_stats_data(&broker_addr, request_header, self.timeout_millis.as_millis() as u64)
2219 .await
2220 }
2221
2222 async fn fetch_consume_stats_in_broker(
2223 &self,
2224 _broker_addr: CheetahString,
2225 _is_order: bool,
2226 _timeout_millis: u64,
2227 ) -> rocketmq_error::RocketMQResult<ConsumeStatsList> {
2228 unimplemented!("fetch_consume_stats_in_broker not implemented yet")
2229 }
2230
2231 async fn get_all_subscription_group(
2232 &self,
2233 broker_addr: CheetahString,
2234 timeout_millis: u64,
2235 ) -> rocketmq_error::RocketMQResult<SubscriptionGroupWrapper> {
2236 self.client_instance
2237 .as_ref()
2238 .unwrap()
2239 .get_mq_client_api_impl()
2240 .get_all_subscription_group_config(&broker_addr, timeout_millis)
2241 .await
2242 }
2243
2244 async fn get_user_subscription_group(
2245 &self,
2246 broker_addr: CheetahString,
2247 timeout_millis: u64,
2248 ) -> rocketmq_error::RocketMQResult<SubscriptionGroupWrapper> {
2249 let subscription_group_wrapper = self.get_all_subscription_group(broker_addr, timeout_millis).await?;
2250
2251 let system_group_set = get_system_group_set();
2252 let table = subscription_group_wrapper.get_subscription_group_table();
2253 table.retain(|key, _| !mix_all::is_sys_consumer_group(key.as_str()) && !system_group_set.contains(key));
2255
2256 Ok(subscription_group_wrapper)
2257 }
2258
2259 async fn query_consume_queue(
2260 &self,
2261 broker_addr: CheetahString,
2262 topic: CheetahString,
2263 queue_id: i32,
2264 index: u64,
2265 count: i32,
2266 consumer_group: CheetahString,
2267 ) -> rocketmq_error::RocketMQResult<QueryConsumeQueueResponseBody> {
2268 self.client_instance
2269 .as_ref()
2270 .unwrap()
2271 .get_mq_client_api_impl()
2272 .query_consume_queue(
2273 &broker_addr,
2274 topic,
2275 queue_id,
2276 index as i64,
2277 count,
2278 consumer_group,
2279 self.timeout_millis.as_millis() as u64,
2280 )
2281 .await
2282 }
2283
2284 async fn update_and_get_group_read_forbidden(
2285 &self,
2286 _broker_addr: CheetahString,
2287 _group_name: CheetahString,
2288 _topic_name: CheetahString,
2289 _readable: Option<bool>,
2290 ) -> rocketmq_error::RocketMQResult<GroupForbidden> {
2291 unimplemented!("update_and_get_group_read_forbidden not implemented yet")
2292 }
2293
2294 async fn query_message(
2295 &self,
2296 _cluster_name: CheetahString,
2297 topic: CheetahString,
2298 msg_id: CheetahString,
2299 ) -> rocketmq_error::RocketMQResult<MessageExt> {
2300 let client_instance = self
2301 .client_instance
2302 .as_ref()
2303 .ok_or(rocketmq_error::RocketMQError::ClientNotStarted)?;
2304
2305 let msg_id_str = msg_id.as_str();
2306
2307 if let Err(e) = message_decoder::validate_message_id(msg_id_str) {
2308 return Err(rocketmq_error::RocketMQError::IllegalArgument(format!(
2309 "Invalid message ID: {}",
2310 e
2311 )));
2312 }
2313
2314 let message_id = message_decoder::decode_message_id(msg_id_str).map_err(|e| {
2315 rocketmq_error::RocketMQError::IllegalArgument(format!("Failed to decode message ID: {}", e))
2316 })?;
2317 let broker_addr =
2318 CheetahString::from_string(format!("{}:{}", message_id.address.ip(), message_id.address.port()));
2319
2320 let request_header = ViewMessageRequestHeader {
2321 topic: Some(topic),
2322 offset: message_id.offset,
2323 };
2324
2325 client_instance
2326 .get_mq_client_api_impl()
2327 .view_message(&broker_addr, request_header, self.timeout_millis.as_millis() as u64)
2328 .await
2329 }
2330
2331 async fn get_broker_ha_status(&self, broker_addr: CheetahString) -> rocketmq_error::RocketMQResult<HARuntimeInfo> {
2332 if let Some(ref mq_client_instance) = self.client_instance {
2333 Ok(mq_client_instance
2334 .get_mq_client_api_impl()
2335 .get_broker_ha_status(broker_addr, self.timeout_millis.as_millis() as u64)
2336 .await?)
2337 } else {
2338 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2339 }
2340 }
2341
2342 async fn get_in_sync_state_data(
2343 &self,
2344 controller_address: CheetahString,
2345 brokers: Vec<CheetahString>,
2346 ) -> rocketmq_error::RocketMQResult<BrokerReplicasInfo> {
2347 if let Some(ref mq_client_instance) = self.client_instance {
2348 Ok(mq_client_instance
2349 .get_mq_client_api_impl()
2350 .get_in_sync_state_data(controller_address, brokers, self.timeout_millis.as_millis() as u64)
2351 .await?)
2352 } else {
2353 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2354 }
2355 }
2356
2357 async fn get_broker_epoch_cache(
2358 &self,
2359 broker_addr: CheetahString,
2360 ) -> rocketmq_error::RocketMQResult<EpochEntryCache> {
2361 if let Some(ref mq_client_instance) = self.client_instance {
2362 Ok(mq_client_instance
2363 .get_mq_client_api_impl()
2364 .get_broker_epoch_cache(broker_addr, self.timeout_millis.as_millis() as u64)
2365 .await?)
2366 } else {
2367 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2368 }
2369 }
2370
2371 async fn elect_master(
2372 &self,
2373 _controller_addr: CheetahString,
2374 _cluster_name: CheetahString,
2375 _broker_name: CheetahString,
2376 _broker_id: Option<u64>,
2377 ) -> rocketmq_error::RocketMQResult<(ElectMasterResponseHeader, BrokerMemberGroup)> {
2378 unimplemented!("elect_master not implemented yet")
2379 }
2380
2381 async fn create_user_with_info(
2382 &self,
2383 _broker_addr: CheetahString,
2384 _username: CheetahString,
2385 _password: CheetahString,
2386 ) -> rocketmq_error::RocketMQResult<()> {
2387 unimplemented!("create_user_with_info not implemented yet")
2388 }
2389
2390 async fn update_user_with_info(
2391 &self,
2392 _broker_addr: CheetahString,
2393 _username: CheetahString,
2394 _password: CheetahString,
2395 ) -> rocketmq_error::RocketMQResult<()> {
2396 unimplemented!("update_user_with_info not implemented yet")
2397 }
2398
2399 async fn get_user(
2400 &self,
2401 broker_addr: CheetahString,
2402 username: CheetahString,
2403 ) -> rocketmq_error::RocketMQResult<Option<UserInfo>> {
2404 if let Some(ref mq_client_instance) = self.client_instance {
2405 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
2406 let timeout_millis = self.timeout_millis.as_millis() as u64;
2407 let result = mq_client_api.get_user(broker_addr, username, timeout_millis).await?;
2408 Ok(result)
2409 } else {
2410 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2411 }
2412 }
2413
2414 async fn list_users(
2415 &self,
2416 broker_addr: CheetahString,
2417 filter: CheetahString,
2418 ) -> rocketmq_error::RocketMQResult<Vec<UserInfo>> {
2419 if let Some(ref mq_client_instance) = self.client_instance {
2420 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
2421 let timeout_millis = self.timeout_millis.as_millis() as u64;
2422 let result = mq_client_api.list_users(broker_addr, filter, timeout_millis).await?;
2423 Ok(result)
2424 } else {
2425 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2426 }
2427 }
2428
2429 async fn create_acl_with_info(
2430 &self,
2431 _broker_addr: CheetahString,
2432 _subject: CheetahString,
2433 ) -> rocketmq_error::RocketMQResult<()> {
2434 unimplemented!("create_acl_with_info not implemented yet")
2435 }
2436
2437 async fn update_acl_with_info(
2438 &self,
2439 _broker_addr: CheetahString,
2440 _subject: CheetahString,
2441 ) -> rocketmq_error::RocketMQResult<()> {
2442 unimplemented!("update_acl_with_info not implemented yet")
2443 }
2444
2445 async fn get_acl(
2446 &self,
2447 _broker_addr: CheetahString,
2448 _subject: CheetahString,
2449 ) -> rocketmq_error::RocketMQResult<AclInfo> {
2450 unimplemented!("get_acl not implemented yet")
2451 }
2452
2453 async fn list_acl(
2454 &self,
2455 broker_addr: CheetahString,
2456 subject_filter: CheetahString,
2457 resource_filter: CheetahString,
2458 ) -> rocketmq_error::RocketMQResult<Vec<AclInfo>> {
2459 if let Some(ref mq_client_instance) = self.client_instance {
2460 let mq_client_api = mq_client_instance.get_mq_client_api_impl();
2461 let timeout_millis = self.timeout_millis.as_millis() as u64;
2462 let result = mq_client_api
2463 .list_acl(broker_addr, subject_filter, resource_filter, timeout_millis)
2464 .await?;
2465 Ok(result)
2466 } else {
2467 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2468 }
2469 }
2470
2471 async fn get_broker_lite_info(
2472 &self,
2473 broker_addr: CheetahString,
2474 ) -> rocketmq_error::RocketMQResult<GetBrokerLiteInfoResponseBody> {
2475 if let Some(ref mq_client_instance) = self.client_instance {
2476 mq_client_instance
2477 .get_mq_client_api_impl()
2478 .get_broker_lite_info(&broker_addr, self.timeout_millis.as_millis() as u64)
2479 .await
2480 } else {
2481 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2482 }
2483 }
2484
2485 async fn get_parent_topic_info(
2486 &self,
2487 broker_addr: CheetahString,
2488 topic: CheetahString,
2489 ) -> rocketmq_error::RocketMQResult<GetParentTopicInfoResponseBody> {
2490 self.client_instance
2491 .as_ref()
2492 .unwrap()
2493 .get_mq_client_api_impl()
2494 .get_parent_topic_info(&broker_addr, topic, self.timeout_millis.as_millis() as u64)
2495 .await
2496 }
2497
2498 async fn get_lite_topic_info(
2499 &self,
2500 broker_addr: CheetahString,
2501 parent_topic: CheetahString,
2502 lite_topic: CheetahString,
2503 ) -> rocketmq_error::RocketMQResult<GetLiteTopicInfoResponseBody> {
2504 if let Some(ref mq_client_instance) = self.client_instance {
2505 mq_client_instance
2506 .get_mq_client_api_impl()
2507 .get_lite_topic_info(
2508 &broker_addr,
2509 &parent_topic,
2510 &lite_topic,
2511 self.timeout_millis.as_millis() as u64,
2512 )
2513 .await
2514 } else {
2515 Err(rocketmq_error::RocketMQError::ClientNotStarted)
2516 }
2517 }
2518
2519 async fn get_lite_client_info(
2520 &self,
2521 _broker_addr: CheetahString,
2522 _parent_topic: CheetahString,
2523 _group: CheetahString,
2524 _client_id: CheetahString,
2525 ) -> rocketmq_error::RocketMQResult<GetLiteClientInfoResponseBody> {
2526 unimplemented!("get_lite_client_info not implemented yet")
2527 }
2528
2529 async fn get_lite_group_info(
2530 &self,
2531 broker_addr: CheetahString,
2532 group: CheetahString,
2533 lite_topic: CheetahString,
2534 top_k: i32,
2535 ) -> rocketmq_error::RocketMQResult<GetLiteGroupInfoResponseBody> {
2536 self.client_instance
2537 .as_ref()
2538 .unwrap()
2539 .get_mq_client_api_impl()
2540 .get_lite_group_info(
2541 &broker_addr,
2542 group,
2543 lite_topic,
2544 top_k,
2545 self.timeout_millis.as_millis() as u64,
2546 )
2547 .await
2548 }
2549
2550 async fn export_rocksdb_config_to_json(
2551 &self,
2552 _broker_addr: CheetahString,
2553 _config_types: Vec<CheetahString>,
2554 ) -> rocketmq_error::RocketMQResult<()> {
2555 unimplemented!("export_rocksdb_config_to_json not implemented yet")
2556 }
2557
2558 async fn search_offset(
2559 &self,
2560 broker_addr: CheetahString,
2561 topic_name: CheetahString,
2562 queue_id: i32,
2563 timestamp: u64,
2564 timeout_millis: u64,
2565 ) -> rocketmq_error::RocketMQResult<u64> {
2566 let mq = MessageQueue::from_parts(&topic_name, "", queue_id);
2567 let offset = self
2568 .client_instance
2569 .as_ref()
2570 .unwrap()
2571 .get_mq_client_api_impl()
2572 .search_offset_by_timestamp(
2573 broker_addr.as_str(),
2574 &mq,
2575 timestamp as i64,
2576 rocketmq_common::common::boundary_type::BoundaryType::Lower,
2577 timeout_millis,
2578 )
2579 .await?;
2580 Ok(offset as u64)
2581 }
2582
2583 async fn min_offset(
2584 &self,
2585 broker_addr: CheetahString,
2586 message_queue: MessageQueue,
2587 timeout_millis: u64,
2588 ) -> rocketmq_error::RocketMQResult<i64> {
2589 self.client_instance
2590 .as_ref()
2591 .unwrap()
2592 .get_mq_client_api_impl()
2593 .get_min_offset(broker_addr.as_str(), &message_queue, timeout_millis)
2594 .await
2595 }
2596
2597 async fn max_offset(
2598 &self,
2599 broker_addr: CheetahString,
2600 message_queue: MessageQueue,
2601 timeout_millis: u64,
2602 ) -> rocketmq_error::RocketMQResult<i64> {
2603 self.client_instance
2604 .as_ref()
2605 .unwrap()
2606 .get_mq_client_api_impl()
2607 .get_max_offset(broker_addr.as_str(), &message_queue, timeout_millis)
2608 .await
2609 }
2610}
2611
2612impl DefaultMQAdminExtImpl {
2613 async fn reset_offset_by_timestamp_old_on_broker(
2614 &self,
2615 broker_addr: CheetahString,
2616 queue_data: &QueueData,
2617 consumer_group: CheetahString,
2618 topic: CheetahString,
2619 timestamp: i64,
2620 force: bool,
2621 ) -> rocketmq_error::RocketMQResult<Vec<RollbackStats>> {
2622 let consume_stats = self
2623 .client_instance
2624 .as_ref()
2625 .unwrap()
2626 .get_mq_client_api_impl()
2627 .get_consume_stats(
2628 &broker_addr,
2629 GetConsumeStatsRequestHeader {
2630 consumer_group: consumer_group.clone(),
2631 topic: CheetahString::empty(),
2632 topic_request_header: None,
2633 },
2634 self.timeout_millis.as_millis() as u64,
2635 )
2636 .await?;
2637
2638 let mut rollback_stats_list = Vec::new();
2639 let mut has_consumed = false;
2640
2641 for (queue, offset_wrapper) in &consume_stats.offset_table {
2642 if queue.topic() == &topic {
2643 has_consumed = true;
2644 rollback_stats_list.push(
2645 self.reset_offset_consume_offset(
2646 broker_addr.clone(),
2647 consumer_group.clone(),
2648 queue.clone(),
2649 offset_wrapper,
2650 timestamp,
2651 force,
2652 )
2653 .await?,
2654 );
2655 }
2656 }
2657
2658 if !has_consumed {
2659 let topic_status = self
2660 .client_instance
2661 .as_ref()
2662 .unwrap()
2663 .get_mq_client_api_impl()
2664 .get_topic_stats_info(
2665 &broker_addr,
2666 GetTopicStatsInfoRequestHeader {
2667 topic: topic.clone(),
2668 topic_request_header: None,
2669 },
2670 self.timeout_millis.as_millis() as u64,
2671 )
2672 .await?;
2673
2674 for queue_id in 0..queue_data.read_queue_nums() {
2675 let queue = MessageQueue::from_parts(topic.clone(), queue_data.broker_name().clone(), queue_id as i32);
2676 let mut offset_wrapper = OffsetWrapper::new();
2677 let topic_offset = topic_status
2678 .get_offset_table()
2679 .get(&queue)
2680 .cloned()
2681 .unwrap_or_else(TopicOffset::new);
2682 offset_wrapper.set_broker_offset(topic_offset.get_max_offset());
2683 offset_wrapper.set_consumer_offset(topic_offset.get_min_offset());
2684 rollback_stats_list.push(
2685 self.reset_offset_consume_offset(
2686 broker_addr.clone(),
2687 consumer_group.clone(),
2688 queue,
2689 &offset_wrapper,
2690 timestamp,
2691 force,
2692 )
2693 .await?,
2694 );
2695 }
2696 }
2697
2698 Ok(rollback_stats_list)
2699 }
2700
2701 async fn reset_offset_consume_offset(
2702 &self,
2703 broker_addr: CheetahString,
2704 consumer_group: CheetahString,
2705 queue: MessageQueue,
2706 offset_wrapper: &OffsetWrapper,
2707 timestamp: i64,
2708 force: bool,
2709 ) -> rocketmq_error::RocketMQResult<RollbackStats> {
2710 let reset_offset = if timestamp == -1 {
2711 self.client_instance
2712 .as_ref()
2713 .unwrap()
2714 .get_mq_client_api_impl()
2715 .get_max_offset(broker_addr.as_str(), &queue, self.timeout_millis.as_millis() as u64)
2716 .await?
2717 } else {
2718 self.client_instance
2719 .as_ref()
2720 .unwrap()
2721 .get_mq_client_api_impl()
2722 .search_offset_by_timestamp(
2723 broker_addr.as_str(),
2724 &queue,
2725 timestamp,
2726 rocketmq_common::common::boundary_type::BoundaryType::Lower,
2727 self.timeout_millis.as_millis() as u64,
2728 )
2729 .await?
2730 };
2731
2732 let mut rollback_stats = RollbackStats {
2733 broker_name: queue.broker_name().clone(),
2734 queue_id: queue.queue_id() as i64,
2735 broker_offset: offset_wrapper.get_broker_offset(),
2736 consumer_offset: offset_wrapper.get_consumer_offset(),
2737 timestamp_offset: reset_offset,
2738 rollback_offset: offset_wrapper.get_consumer_offset(),
2739 };
2740
2741 if force || reset_offset <= offset_wrapper.get_consumer_offset() {
2742 rollback_stats.rollback_offset = reset_offset;
2743 self.client_instance
2744 .as_ref()
2745 .unwrap()
2746 .get_mq_client_api_impl()
2747 .update_consumer_offset(
2748 &broker_addr,
2749 UpdateConsumerOffsetRequestHeader {
2750 consumer_group,
2751 topic: queue.topic().clone(),
2752 queue_id: queue.queue_id(),
2753 commit_offset: reset_offset,
2754 topic_request_header: None,
2755 },
2756 self.timeout_millis.as_millis() as u64,
2757 )
2758 .await?;
2759 }
2760
2761 Ok(rollback_stats)
2762 }
2763
2764 async fn message_consumed_by_group(
2765 &self,
2766 msg: &MessageExt,
2767 group: &CheetahString,
2768 ) -> rocketmq_error::RocketMQResult<bool> {
2769 let consume_stats = self
2770 .examine_consume_stats(group.clone(), None, None, None, None)
2771 .await?;
2772 let cluster_info = self.examine_broker_cluster_info().await?;
2773
2774 Ok(is_message_consumed(msg, &consume_stats, &cluster_info))
2775 }
2776}
2777
2778fn merge_order_conf_entries(existing: &str, value: &str) -> String {
2779 let mut entries = HashMap::new();
2780 for item in existing.split(';').filter(|item| !item.trim().is_empty()) {
2781 if let Some((broker_name, _)) = item.split_once(':') {
2782 entries.insert(broker_name.to_string(), item.to_string());
2783 }
2784 }
2785 if let Some((broker_name, _)) = value.split_once(':') {
2786 entries.insert(broker_name.to_string(), value.to_string());
2787 } else if !value.trim().is_empty() {
2788 entries.insert(value.to_string(), value.to_string());
2789 }
2790
2791 let mut broker_names: Vec<String> = entries.keys().cloned().collect();
2792 broker_names.sort();
2793 broker_names
2794 .into_iter()
2795 .filter_map(|broker_name| entries.remove(&broker_name))
2796 .collect::<Vec<_>>()
2797 .join(";")
2798}
2799
2800fn select_consumer_direct_connection(
2801 consumer_group: &CheetahString,
2802 consumer_connection: &ConsumerConnection,
2803 requested_client_id: Option<&CheetahString>,
2804) -> rocketmq_error::RocketMQResult<(CheetahString, CheetahString)> {
2805 let requested = requested_client_id.filter(|client_id| !client_id.is_empty());
2806 let connection = consumer_connection
2807 .get_connection_set()
2808 .iter()
2809 .find(|connection| {
2810 requested
2811 .map(|client_id| connection.get_client_id() == *client_id)
2812 .unwrap_or_else(|| !connection.get_client_id().is_empty())
2813 })
2814 .ok_or_else(|| {
2815 let message = requested
2816 .map(|client_id| {
2817 format!(
2818 "Client `{}` was not found in consumer group `{}`",
2819 client_id, consumer_group
2820 )
2821 })
2822 .unwrap_or_else(|| format!("NO CONSUMER for consumer group `{}`", consumer_group));
2823 rocketmq_error::RocketMQError::IllegalArgument(message)
2824 })?;
2825
2826 Ok((connection.get_client_id(), connection.get_client_addr()))
2827}
2828
2829#[allow(deprecated)]
2830fn build_message_track(consumer_group: &str) -> MessageTrack {
2831 MessageTrack {
2832 consumer_group: consumer_group.to_string(),
2833 track_type: Some(TrackType::Unknown),
2834 exception_desc: String::new(),
2835 }
2836}
2837
2838#[allow(deprecated)]
2839fn resolve_consumed_track_type(msg: &MessageExt, consumer_connection: &ConsumerConnection) -> TrackType {
2840 let Some(subscription_data) = consumer_connection.get_subscription_table().get(msg.topic()) else {
2841 return TrackType::Consumed;
2842 };
2843
2844 let Some(message_tag) = msg.get_tags() else {
2845 return TrackType::Consumed;
2846 };
2847
2848 if subscription_data.tags_set.is_empty()
2849 || subscription_data
2850 .tags_set
2851 .contains(&CheetahString::from_static_str(SubscriptionData::SUB_ALL))
2852 || subscription_data.tags_set.contains(&message_tag)
2853 {
2854 TrackType::Consumed
2855 } else {
2856 TrackType::ConsumedButFiltered
2857 }
2858}
2859
2860fn is_message_consumed(msg: &MessageExt, consume_stats: &ConsumeStats, cluster_info: &ClusterInfo) -> bool {
2861 consume_stats.get_offset_table().iter().any(|(queue, offset_wrapper)| {
2862 queue.topic() == msg.topic()
2863 && queue.queue_id() == msg.queue_id()
2864 && resolve_master_broker_addr(cluster_info, queue)
2865 .map(|broker_addr| {
2866 broker_addr_matches_store_host(broker_addr, msg.store_host())
2867 && offset_wrapper.get_consumer_offset() > msg.queue_offset()
2868 })
2869 .unwrap_or(false)
2870 })
2871}
2872
2873fn resolve_master_broker_addr<'a>(cluster_info: &'a ClusterInfo, queue: &MessageQueue) -> Option<&'a CheetahString> {
2874 cluster_info
2875 .broker_addr_table
2876 .as_ref()?
2877 .get(queue.broker_name())?
2878 .broker_addrs()
2879 .get(&mix_all::MASTER_ID)
2880}
2881
2882fn broker_addr_matches_store_host(broker_addr: &CheetahString, store_host: std::net::SocketAddr) -> bool {
2883 broker_addr
2884 .parse::<std::net::SocketAddr>()
2885 .map(|parsed| parsed == store_host)
2886 .unwrap_or_else(|_| broker_addr.as_str() == store_host.to_string())
2887}
2888
2889#[allow(deprecated)]
2890fn apply_track_error(track: &mut MessageTrack, error: &RocketMQError) {
2891 if let Some(code) = response_code_from_error(error) {
2892 match code {
2893 ResponseCode::ConsumerNotOnline => track.set_track_type(TrackType::NotOnline),
2894 ResponseCode::BroadcastConsumption => track.set_track_type(TrackType::ConsumeBroadcasting),
2895 _ => {}
2896 }
2897 }
2898
2899 track.set_exception_desc(track_exception_desc(error));
2900}
2901
2902fn response_code_from_error(error: &RocketMQError) -> Option<ResponseCode> {
2903 match error {
2904 RocketMQError::BrokerOperationFailed { code, .. } => Some(ResponseCode::from(*code)),
2905 RocketMQError::IllegalArgument(message) => parse_response_code_from_message(message),
2906 _ => None,
2907 }
2908}
2909
2910fn parse_response_code_from_message(message: &str) -> Option<ResponseCode> {
2911 let code_start = message.find("CODE:")?;
2912 let digits = message[code_start + "CODE:".len()..]
2913 .trim_start()
2914 .chars()
2915 .take_while(|ch| ch.is_ascii_digit() || *ch == '-')
2916 .collect::<String>();
2917
2918 if digits.is_empty() {
2919 return None;
2920 }
2921
2922 digits.parse::<i32>().ok().map(ResponseCode::from)
2923}
2924
2925fn track_exception_desc(error: &RocketMQError) -> String {
2926 match error {
2927 RocketMQError::BrokerOperationFailed { code, message, .. } => format!("CODE:{code} DESC:{message}"),
2928 _ => error.to_string(),
2929 }
2930}
2931
2932fn admin_result_code_for_error(error: &RocketMQError) -> AdminToolsResultCodeEnum {
2933 match response_code_from_error(error) {
2934 Some(ResponseCode::ConsumerNotOnline) => AdminToolsResultCodeEnum::ConsumerNotOnline,
2935 Some(ResponseCode::BroadcastConsumption) => AdminToolsResultCodeEnum::BroadcastConsumption,
2936 Some(_) => AdminToolsResultCodeEnum::MQBrokerError,
2937 None => AdminToolsResultCodeEnum::MQClientError,
2938 }
2939}
2940
2941#[cfg(test)]
2942mod tests {
2943 use std::collections::BTreeSet;
2944 use std::collections::HashMap;
2945
2946 use cheetah_string::CheetahString;
2947 use rocketmq_common::common::message::message_builder::MessageBuilder;
2948 use rocketmq_common::common::message::message_ext::MessageExt;
2949 use rocketmq_common::common::message::message_queue::MessageQueue;
2950 use rocketmq_common::common::mix_all;
2951 #[allow(deprecated)]
2952 use rocketmq_common::common::tools::track_type::TrackType;
2953 use rocketmq_remoting::code::response_code::ResponseCode;
2954 use rocketmq_remoting::protocol::admin::consume_stats::ConsumeStats;
2955 use rocketmq_remoting::protocol::admin::offset_wrapper::OffsetWrapper;
2956 use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
2957 use rocketmq_remoting::protocol::body::connection::Connection;
2958 use rocketmq_remoting::protocol::body::consumer_connection::ConsumerConnection;
2959 use rocketmq_remoting::protocol::body::producer_connection::ProducerConnection;
2960 use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
2961 use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
2962 use rocketmq_remoting::protocol::route::route_data_view::BrokerData;
2963
2964 use super::encode_topic_attributes;
2965 use super::is_message_consumed;
2966 use super::merge_order_conf_entries;
2967 use super::parse_response_code_from_message;
2968 use super::resolve_consumed_track_type;
2969 use super::select_consumer_direct_connection;
2970
2971 #[test]
2972 fn merge_order_conf_entries_replaces_existing_broker_value() {
2973 let merged = merge_order_conf_entries("broker-a:4;broker-b:4", "broker-a:8");
2974 assert_eq!(merged, "broker-a:8;broker-b:4");
2975 }
2976
2977 #[test]
2978 fn merge_order_conf_entries_adds_new_broker_value() {
2979 let merged = merge_order_conf_entries("broker-a:4", "broker-b:8");
2980 assert_eq!(merged, "broker-a:4;broker-b:8");
2981 }
2982
2983 #[test]
2984 fn encode_topic_attributes_matches_java_attribute_parser_format() {
2985 let mut attributes = HashMap::<CheetahString, CheetahString>::new();
2986 attributes.insert("+message.type".into(), "NORMAL".into());
2987
2988 let encoded = encode_topic_attributes(&attributes);
2989
2990 assert_eq!(encoded, Some(CheetahString::from("+message.type=NORMAL")));
2991 }
2992
2993 #[test]
2994 fn producer_connection_empty_set_represents_offline_group() {
2995 let connection = ProducerConnection::new();
2996 assert!(connection.connection_set().is_empty());
2997 }
2998
2999 #[test]
3000 fn producer_connection_with_entries_represents_online_group() {
3001 let mut connection = ProducerConnection::new();
3002 let mut entry = Connection::new();
3003 entry.set_client_id("client-a".into());
3004 connection.connection_set_mut().insert(entry);
3005
3006 assert_eq!(connection.connection_set().len(), 1);
3007 }
3008
3009 #[test]
3010 fn select_consumer_direct_connection_uses_requested_client_when_present() {
3011 let consumer_group = CheetahString::from("group-a");
3012 let requested_client_id = CheetahString::from("client-b");
3013 let mut consumer_connection = ConsumerConnection::new();
3014 let mut first = Connection::new();
3015 first.set_client_id("client-a".into());
3016 first.set_client_addr("127.0.0.1:1001".into());
3017 let mut second = Connection::new();
3018 second.set_client_id(requested_client_id.clone());
3019 second.set_client_addr("127.0.0.1:1002".into());
3020 consumer_connection.insert_connection(first);
3021 consumer_connection.insert_connection(second);
3022
3023 let (client_id, client_addr) =
3024 select_consumer_direct_connection(&consumer_group, &consumer_connection, Some(&requested_client_id))
3025 .expect("requested client should be selected");
3026
3027 assert_eq!(client_id, requested_client_id);
3028 assert_eq!(client_addr, CheetahString::from("127.0.0.1:1002"));
3029 }
3030
3031 #[test]
3032 fn select_consumer_direct_connection_returns_first_available_client_when_unspecified() {
3033 let consumer_group = CheetahString::from("group-a");
3034 let mut consumer_connection = ConsumerConnection::new();
3035 let mut only = Connection::new();
3036 only.set_client_id("client-a".into());
3037 only.set_client_addr("127.0.0.1:1001".into());
3038 consumer_connection.insert_connection(only);
3039
3040 let (client_id, client_addr) =
3041 select_consumer_direct_connection(&consumer_group, &consumer_connection, Some(&CheetahString::default()))
3042 .expect("single consumer should be selected");
3043
3044 assert_eq!(client_id, CheetahString::from("client-a"));
3045 assert_eq!(client_addr, CheetahString::from("127.0.0.1:1001"));
3046 }
3047
3048 #[test]
3049 fn select_consumer_direct_connection_errors_when_group_is_offline() {
3050 let consumer_group = CheetahString::from("group-a");
3051 let consumer_connection = ConsumerConnection::new();
3052
3053 let error = select_consumer_direct_connection(&consumer_group, &consumer_connection, None)
3054 .expect_err("offline group should not resolve a client");
3055
3056 assert!(error.to_string().contains("NO CONSUMER"));
3057 }
3058
3059 #[test]
3060 #[allow(deprecated)]
3061 fn resolve_consumed_track_type_marks_filtered_subscription() {
3062 let message = MessageBuilder::new()
3063 .topic("TopicTest")
3064 .body_slice(b"payload")
3065 .tags("TagA")
3066 .build_unchecked();
3067 let mut message_ext = MessageExt::default();
3068 message_ext.set_message_inner(message);
3069
3070 let mut subscription = SubscriptionData {
3071 topic: CheetahString::from("TopicTest"),
3072 ..Default::default()
3073 };
3074 subscription.tags_set = BTreeSet::from([CheetahString::from("TagB")]);
3075
3076 let mut connection = ConsumerConnection::new();
3077 connection.set_consume_type(ConsumeType::ConsumePassively);
3078 connection
3079 .get_subscription_table_mut()
3080 .insert(CheetahString::from("TopicTest"), subscription);
3081
3082 let track_type = resolve_consumed_track_type(&message_ext, &connection);
3083
3084 assert_eq!(track_type, TrackType::ConsumedButFiltered);
3085 }
3086
3087 #[test]
3088 fn is_message_consumed_returns_true_when_offset_has_advanced_on_master() {
3089 let message = MessageBuilder::new()
3090 .topic("TopicTest")
3091 .body_slice(b"payload")
3092 .build_unchecked();
3093 let mut message_ext = MessageExt::default();
3094 message_ext.set_message_inner(message);
3095 message_ext.set_queue_id(1);
3096 message_ext.set_queue_offset(10);
3097 message_ext.set_store_host("127.0.0.1:10911".parse().expect("store host"));
3098
3099 let mut consume_stats = ConsumeStats::new();
3100 let mut offset_wrapper = OffsetWrapper::default();
3101 offset_wrapper.set_consumer_offset(11);
3102 consume_stats
3103 .get_offset_table_mut()
3104 .insert(MessageQueue::from_parts("TopicTest", "broker-a", 1), offset_wrapper);
3105
3106 let mut broker_addrs = HashMap::new();
3107 broker_addrs.insert(mix_all::MASTER_ID, CheetahString::from("127.0.0.1:10911"));
3108 let broker_data = BrokerData::new(
3109 CheetahString::from("cluster-a"),
3110 CheetahString::from("broker-a"),
3111 broker_addrs,
3112 None,
3113 );
3114 let cluster_info = ClusterInfo::new(
3115 Some(HashMap::from([(CheetahString::from("broker-a"), broker_data)])),
3116 None,
3117 );
3118
3119 assert!(is_message_consumed(&message_ext, &consume_stats, &cluster_info));
3120 }
3121
3122 #[test]
3123 fn parse_response_code_from_message_reads_consumer_not_online_code() {
3124 let code = parse_response_code_from_message("CODE: 206 DESC: Not found the consumer group connection");
3125
3126 assert_eq!(code, Some(ResponseCode::ConsumerNotOnline));
3127 }
3128}