Skip to main content

rocketmq_admin_core/core/
admin.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Admin client builder and RAII resource management
16//!
17//! This module provides ergonomic patterns for managing admin clients:
18//! - [`AdminBuilder`] - Fluent builder pattern for client configuration
19//! - [`AdminGuard`] - RAII wrapper for automatic resource cleanup
20
21use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
22use rocketmq_common::TimeUtils::current_millis;
23
24use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
25use crate::core::RocketMQResult;
26
27/// Builder for creating and configuring admin clients
28///
29/// # Examples
30///
31/// ```rust,ignore
32/// use rocketmq_admin_core::core::admin::AdminBuilder;
33///
34/// // Simple usage
35/// let admin = AdminBuilder::new()
36///     .namesrv_addr("127.0.0.1:9876")
37///     .build_and_start()
38///     .await?;
39///
40/// // With custom configuration
41/// let admin = AdminBuilder::new()
42///     .namesrv_addr("127.0.0.1:9876;127.0.0.1:9877")
43///     .instance_name("my-admin-tool")
44///     .timeout_millis(5000)
45///     .build_and_start()
46///     .await?;
47/// ```
48#[derive(Debug, Clone, Default)]
49pub struct AdminBuilder {
50    namesrv_addr: Option<String>,
51    instance_name: Option<String>,
52    timeout_millis: Option<u64>,
53    unit_name: Option<String>,
54}
55
56impl AdminBuilder {
57    /// Create a new builder with default configuration
58    #[inline]
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    /// Set NameServer address
64    ///
65    /// Supports multiple addresses separated by semicolon:
66    /// - Single: `"127.0.0.1:9876"`
67    /// - Multiple: `"127.0.0.1:9876;127.0.0.1:9877"`
68    #[inline]
69    pub fn namesrv_addr(mut self, addr: impl Into<String>) -> Self {
70        self.namesrv_addr = Some(addr.into());
71        self
72    }
73
74    /// Set custom instance name
75    ///
76    /// If not set, defaults to `"tools-{timestamp}"`
77    #[inline]
78    pub fn instance_name(mut self, name: impl Into<String>) -> Self {
79        self.instance_name = Some(name.into());
80        self
81    }
82
83    /// Set timeout in milliseconds
84    #[inline]
85    pub fn timeout_millis(mut self, timeout: u64) -> Self {
86        self.timeout_millis = Some(timeout);
87        self
88    }
89
90    /// Set unit name for namespace isolation
91    #[inline]
92    pub fn unit_name(mut self, name: impl Into<String>) -> Self {
93        self.unit_name = Some(name.into());
94        self
95    }
96
97    /// Build and start the admin client
98    ///
99    /// This will:
100    /// 1. Create a new DefaultMQAdminExt instance
101    /// 2. Apply all configuration
102    /// 3. Start the client (establish connections)
103    ///
104    /// # Errors
105    ///
106    /// Returns error if:
107    /// - NameServer address is invalid
108    /// - Connection cannot be established
109    /// - Network I/O fails
110    pub async fn build_and_start(self) -> RocketMQResult<DefaultMQAdminExt> {
111        let mut admin = DefaultMQAdminExt::new();
112
113        // Apply NameServer address
114        if let Some(addr) = self.namesrv_addr {
115            admin.set_namesrv_addr(&addr);
116        }
117
118        // Apply instance name (default: "tools-{timestamp}")
119        let instance_name = self
120            .instance_name
121            .unwrap_or_else(|| format!("tools-{}", current_millis()));
122        admin.client_config_mut().set_instance_name(instance_name.into());
123
124        // Note: timeout_millis and unit_name are stored but not currently applied
125        // as the corresponding setter methods are not available in ClientConfig
126
127        // Start the admin client
128        admin.start().await?;
129
130        Ok(admin)
131    }
132
133    /// Build the admin client with RAII auto-cleanup
134    ///
135    /// Returns an [`AdminGuard`] that automatically calls shutdown when dropped.
136    ///
137    /// # Examples
138    ///
139    /// ```rust,ignore
140    /// {
141    ///     let admin = AdminBuilder::new()
142    ///         .namesrv_addr("127.0.0.1:9876")
143    ///         .build_with_guard()
144    ///         .await?;
145    ///
146    ///     // Use admin...
147    ///     let clusters = TopicService::get_topic_cluster_list(&admin, "MyTopic").await?;
148    /// } // admin automatically cleaned up here
149    /// ```
150    pub async fn build_with_guard(self) -> RocketMQResult<AdminGuard> {
151        let admin = self.build_and_start().await?;
152        Ok(AdminGuard::new(admin))
153    }
154}
155
156/// RAII guard for automatic admin client cleanup
157///
158/// This wrapper ensures that the admin client is properly shut down when
159/// it goes out of scope, preventing resource leaks and dangling connections.
160///
161/// # Examples
162///
163/// ```rust,ignore
164/// use rocketmq_admin_core::core::admin::{AdminBuilder, AdminGuard};
165///
166/// async fn process_topics() -> RocketMQResult<()> {
167///     let admin = AdminBuilder::new()
168///         .namesrv_addr("127.0.0.1:9876")
169///         .build_with_guard()
170///         .await?;
171///
172///     // Use admin through Deref
173///     let result = admin.examine_topic_route_info("MyTopic").await?;
174///
175///     Ok(())
176/// } // admin.shutdown() called automatically here
177/// ```
178pub struct AdminGuard {
179    admin: Option<DefaultMQAdminExt>,
180    runtime: tokio::runtime::Handle,
181}
182
183impl AdminGuard {
184    /// Create a new guard wrapping an admin client
185    fn new(admin: DefaultMQAdminExt) -> Self {
186        Self {
187            admin: Some(admin),
188            runtime: tokio::runtime::Handle::current(),
189        }
190    }
191
192    /// Manually shutdown the admin client
193    ///
194    /// This consumes the guard and prevents the automatic Drop shutdown.
195    pub async fn shutdown(mut self) {
196        if let Some(mut admin) = self.admin.take() {
197            admin.shutdown().await;
198        }
199    }
200
201    /// Get a reference to the inner admin client
202    #[inline]
203    pub fn inner(&self) -> &DefaultMQAdminExt {
204        self.admin.as_ref().expect("AdminGuard already consumed")
205    }
206
207    /// Get a mutable reference to the inner admin client
208    #[inline]
209    pub fn inner_mut(&mut self) -> &mut DefaultMQAdminExt {
210        self.admin.as_mut().expect("AdminGuard already consumed")
211    }
212}
213
214impl std::ops::Deref for AdminGuard {
215    type Target = DefaultMQAdminExt;
216
217    fn deref(&self) -> &Self::Target {
218        self.inner()
219    }
220}
221
222impl std::ops::DerefMut for AdminGuard {
223    fn deref_mut(&mut self) -> &mut Self::Target {
224        self.inner_mut()
225    }
226}
227
228impl Drop for AdminGuard {
229    fn drop(&mut self) {
230        if let Some(mut admin) = self.admin.take() {
231            // Spawn shutdown on the runtime
232            // Note: This is best-effort cleanup; errors are ignored
233            let runtime = self.runtime.clone();
234            runtime.spawn(async move {
235                let _ = admin.shutdown().await;
236            });
237        }
238    }
239}
240
241// Allow using AdminGuard where DefaultMQAdminExt is expected
242impl AsRef<DefaultMQAdminExt> for AdminGuard {
243    fn as_ref(&self) -> &DefaultMQAdminExt {
244        self.inner()
245    }
246}
247
248impl AsMut<DefaultMQAdminExt> for AdminGuard {
249    fn as_mut(&mut self) -> &mut DefaultMQAdminExt {
250        self.inner_mut()
251    }
252}
253
254/// Helper function to create a simple admin client with just address
255///
256/// Equivalent to:
257/// ```rust,ignore
258/// AdminBuilder::new()
259///     .namesrv_addr(addr)
260///     .build_and_start()
261///     .await
262/// ```
263#[inline]
264pub async fn create_admin(namesrv_addr: impl Into<String>) -> RocketMQResult<DefaultMQAdminExt> {
265    AdminBuilder::new().namesrv_addr(namesrv_addr).build_and_start().await
266}
267
268/// Helper function to create an admin client with RAII guard
269///
270/// Equivalent to:
271/// ```rust,ignore
272/// AdminBuilder::new()
273///     .namesrv_addr(addr)
274///     .build_with_guard()
275///     .await
276/// ```
277#[inline]
278pub async fn create_admin_with_guard(namesrv_addr: impl Into<String>) -> RocketMQResult<AdminGuard> {
279    AdminBuilder::new().namesrv_addr(namesrv_addr).build_with_guard().await
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_builder_configuration() {
288        let builder = AdminBuilder::new()
289            .namesrv_addr("127.0.0.1:9876")
290            .instance_name("test-instance")
291            .timeout_millis(5000)
292            .unit_name("test-unit");
293
294        assert_eq!(builder.namesrv_addr, Some("127.0.0.1:9876".to_string()));
295        assert_eq!(builder.instance_name, Some("test-instance".to_string()));
296        assert_eq!(builder.timeout_millis, Some(5000));
297        assert_eq!(builder.unit_name, Some("test-unit".to_string()));
298    }
299
300    #[test]
301    fn test_builder_default() {
302        let builder = AdminBuilder::default();
303        assert!(builder.namesrv_addr.is_none());
304        assert!(builder.instance_name.is_none());
305        assert!(builder.timeout_millis.is_none());
306        assert!(builder.unit_name.is_none());
307    }
308
309    #[test]
310    fn test_builder_chaining() {
311        let builder = AdminBuilder::new().namesrv_addr("addr1").namesrv_addr("addr2"); // Should override
312
313        assert_eq!(builder.namesrv_addr, Some("addr2".to_string()));
314    }
315
316    #[test]
317    fn test_builder_multiple_namesrv() {
318        let builder = AdminBuilder::new().namesrv_addr("127.0.0.1:9876;127.0.0.1:9877;127.0.0.1:9878");
319
320        assert_eq!(
321            builder.namesrv_addr,
322            Some("127.0.0.1:9876;127.0.0.1:9877;127.0.0.1:9878".to_string())
323        );
324    }
325
326    #[test]
327    fn test_builder_partial_config() {
328        let builder = AdminBuilder::new().namesrv_addr("127.0.0.1:9876");
329
330        assert_eq!(builder.namesrv_addr, Some("127.0.0.1:9876".to_string()));
331        assert!(builder.instance_name.is_none());
332        assert!(builder.timeout_millis.is_none());
333    }
334
335    #[test]
336    fn test_builder_from_string_types() {
337        let owned = String::from("127.0.0.1:9876");
338        let builder1 = AdminBuilder::new().namesrv_addr(owned);
339
340        let borrowed = "127.0.0.1:9876";
341        let builder2 = AdminBuilder::new().namesrv_addr(borrowed);
342
343        assert_eq!(builder1.namesrv_addr, builder2.namesrv_addr);
344    }
345
346    #[test]
347    fn test_builder_fluent_api() {
348        // Test that all methods return Self for chaining
349        let builder = AdminBuilder::new()
350            .namesrv_addr("addr")
351            .instance_name("name")
352            .timeout_millis(1000)
353            .unit_name("unit")
354            .namesrv_addr("new_addr"); // Can override
355
356        assert_eq!(builder.namesrv_addr, Some("new_addr".to_string()));
357    }
358}