rocketmq_admin_core/core/admin.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Admin client builder and RAII resource management
16//!
17//! This module provides ergonomic patterns for managing admin clients:
18//! - [`AdminBuilder`] - Fluent builder pattern for client configuration
19//! - [`AdminGuard`] - RAII wrapper for automatic resource cleanup
20
21use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
22use rocketmq_common::TimeUtils::current_millis;
23
24use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
25use crate::core::RocketMQResult;
26
27/// Builder for creating and configuring admin clients
28///
29/// # Examples
30///
31/// ```rust,ignore
32/// use rocketmq_admin_core::core::admin::AdminBuilder;
33///
34/// // Simple usage
35/// let admin = AdminBuilder::new()
36/// .namesrv_addr("127.0.0.1:9876")
37/// .build_and_start()
38/// .await?;
39///
40/// // With custom configuration
41/// let admin = AdminBuilder::new()
42/// .namesrv_addr("127.0.0.1:9876;127.0.0.1:9877")
43/// .instance_name("my-admin-tool")
44/// .timeout_millis(5000)
45/// .build_and_start()
46/// .await?;
47/// ```
48#[derive(Debug, Clone, Default)]
49pub struct AdminBuilder {
50 namesrv_addr: Option<String>,
51 instance_name: Option<String>,
52 timeout_millis: Option<u64>,
53 unit_name: Option<String>,
54}
55
56impl AdminBuilder {
57 /// Create a new builder with default configuration
58 #[inline]
59 pub fn new() -> Self {
60 Self::default()
61 }
62
63 /// Set NameServer address
64 ///
65 /// Supports multiple addresses separated by semicolon:
66 /// - Single: `"127.0.0.1:9876"`
67 /// - Multiple: `"127.0.0.1:9876;127.0.0.1:9877"`
68 #[inline]
69 pub fn namesrv_addr(mut self, addr: impl Into<String>) -> Self {
70 self.namesrv_addr = Some(addr.into());
71 self
72 }
73
74 /// Set custom instance name
75 ///
76 /// If not set, defaults to `"tools-{timestamp}"`
77 #[inline]
78 pub fn instance_name(mut self, name: impl Into<String>) -> Self {
79 self.instance_name = Some(name.into());
80 self
81 }
82
83 /// Set timeout in milliseconds
84 #[inline]
85 pub fn timeout_millis(mut self, timeout: u64) -> Self {
86 self.timeout_millis = Some(timeout);
87 self
88 }
89
90 /// Set unit name for namespace isolation
91 #[inline]
92 pub fn unit_name(mut self, name: impl Into<String>) -> Self {
93 self.unit_name = Some(name.into());
94 self
95 }
96
97 /// Build and start the admin client
98 ///
99 /// This will:
100 /// 1. Create a new DefaultMQAdminExt instance
101 /// 2. Apply all configuration
102 /// 3. Start the client (establish connections)
103 ///
104 /// # Errors
105 ///
106 /// Returns error if:
107 /// - NameServer address is invalid
108 /// - Connection cannot be established
109 /// - Network I/O fails
110 pub async fn build_and_start(self) -> RocketMQResult<DefaultMQAdminExt> {
111 let mut admin = DefaultMQAdminExt::new();
112
113 // Apply NameServer address
114 if let Some(addr) = self.namesrv_addr {
115 admin.set_namesrv_addr(&addr);
116 }
117
118 // Apply instance name (default: "tools-{timestamp}")
119 let instance_name = self
120 .instance_name
121 .unwrap_or_else(|| format!("tools-{}", current_millis()));
122 admin.client_config_mut().set_instance_name(instance_name.into());
123
124 // Note: timeout_millis and unit_name are stored but not currently applied
125 // as the corresponding setter methods are not available in ClientConfig
126
127 // Start the admin client
128 admin.start().await?;
129
130 Ok(admin)
131 }
132
133 /// Build the admin client with RAII auto-cleanup
134 ///
135 /// Returns an [`AdminGuard`] that automatically calls shutdown when dropped.
136 ///
137 /// # Examples
138 ///
139 /// ```rust,ignore
140 /// {
141 /// let admin = AdminBuilder::new()
142 /// .namesrv_addr("127.0.0.1:9876")
143 /// .build_with_guard()
144 /// .await?;
145 ///
146 /// // Use admin...
147 /// let clusters = TopicService::get_topic_cluster_list(&admin, "MyTopic").await?;
148 /// } // admin automatically cleaned up here
149 /// ```
150 pub async fn build_with_guard(self) -> RocketMQResult<AdminGuard> {
151 let admin = self.build_and_start().await?;
152 Ok(AdminGuard::new(admin))
153 }
154}
155
156/// RAII guard for automatic admin client cleanup
157///
158/// This wrapper ensures that the admin client is properly shut down when
159/// it goes out of scope, preventing resource leaks and dangling connections.
160///
161/// # Examples
162///
163/// ```rust,ignore
164/// use rocketmq_admin_core::core::admin::{AdminBuilder, AdminGuard};
165///
166/// async fn process_topics() -> RocketMQResult<()> {
167/// let admin = AdminBuilder::new()
168/// .namesrv_addr("127.0.0.1:9876")
169/// .build_with_guard()
170/// .await?;
171///
172/// // Use admin through Deref
173/// let result = admin.examine_topic_route_info("MyTopic").await?;
174///
175/// Ok(())
176/// } // admin.shutdown() called automatically here
177/// ```
178pub struct AdminGuard {
179 admin: Option<DefaultMQAdminExt>,
180 runtime: tokio::runtime::Handle,
181}
182
183impl AdminGuard {
184 /// Create a new guard wrapping an admin client
185 fn new(admin: DefaultMQAdminExt) -> Self {
186 Self {
187 admin: Some(admin),
188 runtime: tokio::runtime::Handle::current(),
189 }
190 }
191
192 /// Manually shutdown the admin client
193 ///
194 /// This consumes the guard and prevents the automatic Drop shutdown.
195 pub async fn shutdown(mut self) {
196 if let Some(mut admin) = self.admin.take() {
197 admin.shutdown().await;
198 }
199 }
200
201 /// Get a reference to the inner admin client
202 #[inline]
203 pub fn inner(&self) -> &DefaultMQAdminExt {
204 self.admin.as_ref().expect("AdminGuard already consumed")
205 }
206
207 /// Get a mutable reference to the inner admin client
208 #[inline]
209 pub fn inner_mut(&mut self) -> &mut DefaultMQAdminExt {
210 self.admin.as_mut().expect("AdminGuard already consumed")
211 }
212}
213
214impl std::ops::Deref for AdminGuard {
215 type Target = DefaultMQAdminExt;
216
217 fn deref(&self) -> &Self::Target {
218 self.inner()
219 }
220}
221
222impl std::ops::DerefMut for AdminGuard {
223 fn deref_mut(&mut self) -> &mut Self::Target {
224 self.inner_mut()
225 }
226}
227
228impl Drop for AdminGuard {
229 fn drop(&mut self) {
230 if let Some(mut admin) = self.admin.take() {
231 // Spawn shutdown on the runtime
232 // Note: This is best-effort cleanup; errors are ignored
233 let runtime = self.runtime.clone();
234 runtime.spawn(async move {
235 let _ = admin.shutdown().await;
236 });
237 }
238 }
239}
240
241// Allow using AdminGuard where DefaultMQAdminExt is expected
242impl AsRef<DefaultMQAdminExt> for AdminGuard {
243 fn as_ref(&self) -> &DefaultMQAdminExt {
244 self.inner()
245 }
246}
247
248impl AsMut<DefaultMQAdminExt> for AdminGuard {
249 fn as_mut(&mut self) -> &mut DefaultMQAdminExt {
250 self.inner_mut()
251 }
252}
253
254/// Helper function to create a simple admin client with just address
255///
256/// Equivalent to:
257/// ```rust,ignore
258/// AdminBuilder::new()
259/// .namesrv_addr(addr)
260/// .build_and_start()
261/// .await
262/// ```
263#[inline]
264pub async fn create_admin(namesrv_addr: impl Into<String>) -> RocketMQResult<DefaultMQAdminExt> {
265 AdminBuilder::new().namesrv_addr(namesrv_addr).build_and_start().await
266}
267
268/// Helper function to create an admin client with RAII guard
269///
270/// Equivalent to:
271/// ```rust,ignore
272/// AdminBuilder::new()
273/// .namesrv_addr(addr)
274/// .build_with_guard()
275/// .await
276/// ```
277#[inline]
278pub async fn create_admin_with_guard(namesrv_addr: impl Into<String>) -> RocketMQResult<AdminGuard> {
279 AdminBuilder::new().namesrv_addr(namesrv_addr).build_with_guard().await
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn test_builder_configuration() {
288 let builder = AdminBuilder::new()
289 .namesrv_addr("127.0.0.1:9876")
290 .instance_name("test-instance")
291 .timeout_millis(5000)
292 .unit_name("test-unit");
293
294 assert_eq!(builder.namesrv_addr, Some("127.0.0.1:9876".to_string()));
295 assert_eq!(builder.instance_name, Some("test-instance".to_string()));
296 assert_eq!(builder.timeout_millis, Some(5000));
297 assert_eq!(builder.unit_name, Some("test-unit".to_string()));
298 }
299
300 #[test]
301 fn test_builder_default() {
302 let builder = AdminBuilder::default();
303 assert!(builder.namesrv_addr.is_none());
304 assert!(builder.instance_name.is_none());
305 assert!(builder.timeout_millis.is_none());
306 assert!(builder.unit_name.is_none());
307 }
308
309 #[test]
310 fn test_builder_chaining() {
311 let builder = AdminBuilder::new().namesrv_addr("addr1").namesrv_addr("addr2"); // Should override
312
313 assert_eq!(builder.namesrv_addr, Some("addr2".to_string()));
314 }
315
316 #[test]
317 fn test_builder_multiple_namesrv() {
318 let builder = AdminBuilder::new().namesrv_addr("127.0.0.1:9876;127.0.0.1:9877;127.0.0.1:9878");
319
320 assert_eq!(
321 builder.namesrv_addr,
322 Some("127.0.0.1:9876;127.0.0.1:9877;127.0.0.1:9878".to_string())
323 );
324 }
325
326 #[test]
327 fn test_builder_partial_config() {
328 let builder = AdminBuilder::new().namesrv_addr("127.0.0.1:9876");
329
330 assert_eq!(builder.namesrv_addr, Some("127.0.0.1:9876".to_string()));
331 assert!(builder.instance_name.is_none());
332 assert!(builder.timeout_millis.is_none());
333 }
334
335 #[test]
336 fn test_builder_from_string_types() {
337 let owned = String::from("127.0.0.1:9876");
338 let builder1 = AdminBuilder::new().namesrv_addr(owned);
339
340 let borrowed = "127.0.0.1:9876";
341 let builder2 = AdminBuilder::new().namesrv_addr(borrowed);
342
343 assert_eq!(builder1.namesrv_addr, builder2.namesrv_addr);
344 }
345
346 #[test]
347 fn test_builder_fluent_api() {
348 // Test that all methods return Self for chaining
349 let builder = AdminBuilder::new()
350 .namesrv_addr("addr")
351 .instance_name("name")
352 .timeout_millis(1000)
353 .unit_name("unit")
354 .namesrv_addr("new_addr"); // Can override
355
356 assert_eq!(builder.namesrv_addr, Some("new_addr".to_string()));
357 }
358}