rocketmq_admin_core/core/namesrv/
operations.rs1use std::collections::HashMap;
18
19use cheetah_string::CheetahString;
20use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
21
22use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
23use crate::core::RocketMQError;
24use crate::core::RocketMQResult;
25use crate::core::ToolsError;
26
27pub struct NameServerService;
29
30impl NameServerService {
31 pub async fn get_namesrv_config(
40 admin: &mut DefaultMQAdminExt,
41 nameserver_addrs: Vec<CheetahString>,
42 ) -> RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>> {
43 admin
44 .get_name_server_config(nameserver_addrs)
45 .await
46 .map_err(|e| RocketMQError::Tools(ToolsError::nameserver_config_invalid(e.to_string())))
47 }
48
49 pub async fn update_namesrv_config(
59 admin: &mut DefaultMQAdminExt,
60 properties: HashMap<CheetahString, CheetahString>,
61 nameserver_addrs: Option<Vec<CheetahString>>,
62 ) -> RocketMQResult<()> {
63 admin
64 .update_name_server_config(properties, nameserver_addrs)
65 .await
66 .map_err(|e| RocketMQError::Tools(ToolsError::nameserver_config_invalid(e.to_string())))
67 }
68
69 pub async fn create_or_update_kv_config(
80 admin: &mut DefaultMQAdminExt,
81 namespace: impl Into<CheetahString>,
82 key: impl Into<CheetahString>,
83 value: impl Into<CheetahString>,
84 ) -> RocketMQResult<()> {
85 let namespace = namespace.into();
86 let key = key.into();
87 let value = value.into();
88
89 admin
90 .create_and_update_kv_config(namespace.clone(), key.clone(), value)
91 .await
92 .map_err(|e| {
93 RocketMQError::Tools(ToolsError::nameserver_config_invalid(format!(
94 "Failed to create/update KV config [{namespace}:{key}]: {e}"
95 )))
96 })
97 }
98
99 pub async fn delete_kv_config(
109 admin: &mut DefaultMQAdminExt,
110 namespace: impl Into<CheetahString>,
111 key: impl Into<CheetahString>,
112 ) -> RocketMQResult<()> {
113 let namespace = namespace.into();
114 let key = key.into();
115
116 admin
117 .delete_kv_config(namespace.clone(), key.clone())
118 .await
119 .map_err(|e| {
120 RocketMQError::Tools(ToolsError::nameserver_config_invalid(format!(
121 "Failed to delete KV config [{namespace}:{key}]: {e}"
122 )))
123 })
124 }
125
126 pub async fn add_write_perm_of_broker(
136 admin: &mut DefaultMQAdminExt,
137 namesrv_addr: impl Into<CheetahString>,
138 broker_name: impl Into<CheetahString>,
139 ) -> RocketMQResult<i32> {
140 let namesrv = namesrv_addr.into();
141 let broker = broker_name.into();
142
143 admin
144 .add_write_perm_of_broker(namesrv.clone(), broker.clone())
145 .await
146 .map_err(|e| {
147 RocketMQError::Tools(ToolsError::broker_not_found(format!(
148 "Failed to add write permission for broker '{broker}' on NameServer '{namesrv}': {e}"
149 )))
150 })
151 }
152
153 pub async fn wipe_write_perm_of_broker(
163 admin: &mut DefaultMQAdminExt,
164 namesrv_addr: impl Into<CheetahString>,
165 broker_name: impl Into<CheetahString>,
166 ) -> RocketMQResult<i32> {
167 let namesrv = namesrv_addr.into();
168 let broker = broker_name.into();
169
170 admin
171 .wipe_write_perm_of_broker(namesrv.clone(), broker.clone())
172 .await
173 .map_err(|e| {
174 RocketMQError::Tools(ToolsError::broker_not_found(format!(
175 "Failed to wipe write permission for broker '{broker}' on NameServer '{namesrv}': {e}"
176 )))
177 })
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[test]
186 fn test_namesrv_service_exists() {
187 let _service = NameServerService;
189 }
190}