admin_builder_pattern/
admin_builder_pattern.rs1#![recursion_limit = "256"]
16
17use rocketmq_admin_core::core::admin::create_admin_with_guard;
25use rocketmq_admin_core::core::admin::AdminBuilder;
26use rocketmq_admin_core::core::topic::TopicService;
27use rocketmq_admin_core::core::RocketMQResult;
28use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt;
29
30async fn example_simple() -> RocketMQResult<()> {
32 println!("=== Example 1: Simple Usage ===");
33
34 let mut admin = create_admin_with_guard("127.0.0.1:9876").await?;
36
37 let clusters = TopicService::get_topic_cluster_list(&mut admin, "TestTopic").await?;
39
40 println!("Topic clusters: {:?}", clusters.clusters);
41
42 Ok(())
44}
45
46async fn example_builder() -> RocketMQResult<()> {
48 println!("\n=== Example 2: Builder Pattern ===");
49
50 let admin = AdminBuilder::new()
51 .namesrv_addr("127.0.0.1:9876")
52 .instance_name("example-admin")
53 .timeout_millis(5000)
54 .build_with_guard()
55 .await?;
56
57 let route = admin.examine_topic_route_info("TestTopic".into()).await?;
59
60 if let Some(route_data) = route {
61 println!("Queue data count: {}", route_data.queue_datas.len());
62 println!("Broker data count: {}", route_data.broker_datas.len());
63 } else {
64 println!("No route data found for TestTopic");
65 }
66
67 Ok(())
68}
69
70async fn example_multiple_namesrv() -> RocketMQResult<()> {
72 println!("\n=== Example 3: Multiple NameServers ===");
73
74 let result = AdminBuilder::new()
76 .namesrv_addr("127.0.0.1:9876;127.0.0.1:9877")
77 .build_with_guard()
78 .await;
79
80 let _admin = match result {
81 Ok(admin) => {
82 println!("Connected to primary NameServer");
83 admin
84 }
85 Err(e) => {
86 eprintln!("Primary NameServer failed: {e}");
87 eprintln!("Falling back to backup...");
88
89 AdminBuilder::new()
91 .namesrv_addr("127.0.0.1:9878")
92 .build_with_guard()
93 .await?
94 }
95 };
96
97 println!("Admin client ready");
98
99 Ok(())
100}
101
102async fn example_early_return(topic: &str) -> RocketMQResult<()> {
104 println!("\n=== Example 4: Early Return ===");
105
106 let admin = AdminBuilder::new()
107 .namesrv_addr("127.0.0.1:9876")
108 .build_with_guard()
109 .await?;
110
111 let route = admin.examine_topic_route_info(topic.into()).await?;
113
114 let Some(route_data) = route else {
116 println!("Topic '{}' not found, exiting early", topic);
117 return Ok(());
118 };
119
120 if route_data.queue_datas.is_empty() {
122 println!("Topic '{}' has no queues, exiting early", topic);
123 return Ok(());
124 }
125
126 println!("Topic '{}' has {} queues", topic, route_data.queue_datas.len());
127
128 for queue in &route_data.queue_datas {
130 println!(
131 " Broker: {}, ReadQueueNums: {}, WriteQueueNums: {}",
132 queue.broker_name, queue.read_queue_nums, queue.write_queue_nums
133 );
134 }
135
136 Ok(())
137}
138
139async fn example_explicit_shutdown() -> RocketMQResult<()> {
141 println!("\n=== Example 5: Explicit Shutdown ===");
142
143 let admin = AdminBuilder::new()
144 .namesrv_addr("127.0.0.1:9876")
145 .instance_name("explicit-shutdown-example")
146 .build_with_guard()
147 .await?;
148
149 println!("Admin client started successfully");
151
152 println!("Shutting down admin client...");
154 admin.shutdown().await;
155 println!("Shutdown complete");
156
157 Ok(())
158}
159
160async fn example_from_environment() -> RocketMQResult<()> {
162 println!("\n=== Example 6: Environment Configuration ===");
163
164 use std::env;
165
166 let namesrv_addr = env::var("NAMESRV_ADDR").unwrap_or_else(|_| "127.0.0.1:9876".to_string());
168
169 let instance_name = env::var("INSTANCE_NAME").unwrap_or_else(|_| "env-admin".to_string());
170
171 println!("NameServer: {}", namesrv_addr);
172 println!("Instance: {}", instance_name);
173
174 let _admin = AdminBuilder::new()
175 .namesrv_addr(namesrv_addr)
176 .instance_name(instance_name)
177 .build_with_guard()
178 .await?;
179
180 println!("Admin configured from environment");
181
182 Ok(())
183}
184
185async fn example_manual_cleanup() -> RocketMQResult<()> {
187 println!("\n=== Example 7: Manual Cleanup ===");
188
189 let mut admin = AdminBuilder::new()
191 .namesrv_addr("127.0.0.1:9876")
192 .build_and_start() .await?;
194
195 println!("Admin started");
197
198 admin.shutdown().await;
200 println!("Manually shut down");
201
202 Ok(())
203}
204
205#[tokio::main]
206async fn main() -> RocketMQResult<()> {
207 println!("RocketMQ Admin Builder & RAII Examples\n");
208 println!("Note: These examples require a running RocketMQ NameServer");
209 println!("======================================================\n");
210
211 if let Err(e) = example_simple().await {
213 eprintln!("Example 1 failed: {e}");
214 }
215
216 if let Err(e) = example_builder().await {
217 eprintln!("Example 2 failed: {e}");
218 }
219
220 if let Err(e) = example_multiple_namesrv().await {
221 eprintln!("Example 3 failed: {e}");
222 }
223
224 if let Err(e) = example_early_return("TestTopic").await {
225 eprintln!("Example 4 failed: {e}");
226 }
227
228 if let Err(e) = example_explicit_shutdown().await {
229 eprintln!("Example 5 failed: {e}");
230 }
231
232 if let Err(e) = example_from_environment().await {
233 eprintln!("Example 6 failed: {e}");
234 }
235
236 if let Err(e) = example_manual_cleanup().await {
237 eprintln!("Example 7 failed: {e}");
238 }
239
240 println!("\n======================================================");
241 println!("Examples completed!");
242
243 Ok(())
244}