rocketmq_client_rust/consumer/
default_mq_push_consumer_builder.rs1use std::collections::HashMap;
18use std::sync::Arc;
19
20use cheetah_string::CheetahString;
21use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
22use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
23use rocketmq_remoting::runtime::RPCHook;
24use rocketmq_rust::ArcMut;
25
26use crate::base::client_config::ClientConfig;
27use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
28use crate::consumer::default_mq_push_consumer::ConsumerConfig;
29use crate::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
30use crate::consumer::message_queue_listener::MessageQueueListener;
31use crate::consumer::mq_push_consumer::MQPushConsumer;
32use crate::trace::trace_dispatcher::TraceDispatcher;
33
34pub struct DefaultMQPushConsumerBuilder {
35 client_config: Option<ClientConfig>,
36 consumer_group: Option<CheetahString>,
37 topic_sub_expression: (Option<CheetahString>, Option<CheetahString>),
38 message_model: Option<MessageModel>,
39 consume_from_where: Option<ConsumeFromWhere>,
40 consume_timestamp: Option<CheetahString>,
41 allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
42 subscription: Option<ArcMut<HashMap<CheetahString, CheetahString>>>,
43
44 message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
45
46 consume_thread_min: Option<u32>,
47 consume_thread_max: Option<u32>,
48 adjust_thread_pool_nums_threshold: Option<u64>,
49 consume_concurrently_max_span: Option<u32>,
50 pull_threshold_for_queue: Option<u32>,
51 pop_threshold_for_queue: Option<u32>,
52 pull_threshold_size_for_queue: Option<u32>,
53 pull_threshold_for_topic: Option<i32>,
54 pull_threshold_size_for_topic: Option<i32>,
55 pull_interval: Option<u64>,
56 consume_message_batch_max_size: Option<u32>,
57 pull_batch_size: Option<u32>,
58 pull_batch_size_in_bytes: Option<u32>,
59 post_subscription_when_pull: Option<bool>,
60 unit_mode: Option<bool>,
61 max_reconsume_times: Option<i32>,
62 suspend_current_queue_time_millis: Option<u64>,
63 consume_timeout: Option<u64>,
64 pop_invisible_time: Option<u64>,
65 pop_batch_nums: Option<u32>,
66 await_termination_millis_when_shutdown: Option<u64>,
67 trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
68 client_rebalance: Option<bool>,
69 rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
70}
71
72impl Default for DefaultMQPushConsumerBuilder {
73 fn default() -> Self {
74 Self {
75 client_config: Some(Default::default()),
76 consumer_group: None,
77 topic_sub_expression: (None, None),
78 message_model: None,
79 consume_from_where: None,
80 consume_timestamp: None,
81 allocate_message_queue_strategy: None,
82 subscription: None,
83 message_queue_listener: None,
84 consume_thread_min: None,
85 consume_thread_max: None,
86 adjust_thread_pool_nums_threshold: None,
87 consume_concurrently_max_span: None,
88 pull_threshold_for_queue: None,
89 pop_threshold_for_queue: None,
90 pull_threshold_size_for_queue: None,
91 pull_threshold_for_topic: None,
92 pull_threshold_size_for_topic: None,
93 pull_interval: None,
94 consume_message_batch_max_size: None,
95 pull_batch_size: None,
96 pull_batch_size_in_bytes: None,
97 post_subscription_when_pull: None,
98 unit_mode: None,
99 max_reconsume_times: None,
100 suspend_current_queue_time_millis: None,
101 consume_timeout: None,
102 pop_invisible_time: None,
103 pop_batch_nums: None,
104 await_termination_millis_when_shutdown: None,
105 trace_dispatcher: None,
106 client_rebalance: None,
107 rpc_hook: None,
108 }
109 }
110}
111
112impl DefaultMQPushConsumerBuilder {
113 pub fn name_server_addr(mut self, name_server_addr: impl Into<CheetahString>) -> Self {
114 if let Some(client_config) = self.client_config.as_mut() {
115 client_config.namesrv_addr = Some(name_server_addr.into());
116 client_config
117 .namespace_initialized
118 .store(false, std::sync::atomic::Ordering::Release);
119 }
120 self
121 }
122
123 pub fn client_config(mut self, client_config: ClientConfig) -> Self {
124 self.client_config = Some(client_config);
125 self
126 }
127
128 pub fn consumer_group(mut self, consumer_group: impl Into<CheetahString>) -> Self {
130 self.consumer_group = Some(consumer_group.into());
131 self
132 }
133
134 pub fn message_model(mut self, message_model: MessageModel) -> Self {
135 self.message_model = Some(message_model);
136 self
137 }
138
139 pub fn consume_from_where(mut self, consume_from_where: ConsumeFromWhere) -> Self {
140 self.consume_from_where = Some(consume_from_where);
141 self
142 }
143
144 pub fn consume_timestamp(mut self, consume_timestamp: impl Into<CheetahString>) -> Self {
145 self.consume_timestamp = Some(consume_timestamp.into());
146 self
147 }
148
149 pub fn allocate_message_queue_strategy(
150 mut self,
151 allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy>,
152 ) -> Self {
153 self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
154 self
155 }
156
157 pub fn subscribe(
158 mut self,
159 topic: impl Into<CheetahString>,
160 sub_expression: impl Into<CheetahString>,
161 ) -> Self {
162 self.topic_sub_expression.0 = Some(topic.into());
163 self.topic_sub_expression.1 = Some(sub_expression.into());
164 self
165 }
166
167 pub fn message_queue_listener(
176 mut self,
177 message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
178 ) -> Self {
179 self.message_queue_listener = message_queue_listener;
180 self
181 }
182
183 pub fn consume_thread_min(mut self, consume_thread_min: u32) -> Self {
184 self.consume_thread_min = Some(consume_thread_min);
185 self
186 }
187
188 pub fn consume_thread_max(mut self, consume_thread_max: u32) -> Self {
189 self.consume_thread_max = Some(consume_thread_max);
190 self
191 }
192
193 pub fn adjust_thread_pool_nums_threshold(
194 mut self,
195 adjust_thread_pool_nums_threshold: u64,
196 ) -> Self {
197 self.adjust_thread_pool_nums_threshold = Some(adjust_thread_pool_nums_threshold);
198 self
199 }
200
201 pub fn consume_concurrently_max_span(mut self, consume_concurrently_max_span: u32) -> Self {
202 self.consume_concurrently_max_span = Some(consume_concurrently_max_span);
203 self
204 }
205
206 pub fn pull_threshold_for_queue(mut self, pull_threshold_for_queue: u32) -> Self {
207 self.pull_threshold_for_queue = Some(pull_threshold_for_queue);
208 self
209 }
210
211 pub fn pop_threshold_for_queue(mut self, pop_threshold_for_queue: u32) -> Self {
212 self.pop_threshold_for_queue = Some(pop_threshold_for_queue);
213 self
214 }
215
216 pub fn pull_threshold_size_for_queue(mut self, pull_threshold_size_for_queue: u32) -> Self {
217 self.pull_threshold_size_for_queue = Some(pull_threshold_size_for_queue);
218 self
219 }
220
221 pub fn pull_threshold_for_topic(mut self, pull_threshold_for_topic: i32) -> Self {
222 self.pull_threshold_for_topic = Some(pull_threshold_for_topic);
223 self
224 }
225
226 pub fn pull_threshold_size_for_topic(mut self, pull_threshold_size_for_topic: i32) -> Self {
227 self.pull_threshold_size_for_topic = Some(pull_threshold_size_for_topic);
228 self
229 }
230
231 pub fn pull_interval(mut self, pull_interval: u64) -> Self {
232 self.pull_interval = Some(pull_interval);
233 self
234 }
235
236 pub fn consume_message_batch_max_size(mut self, consume_message_batch_max_size: u32) -> Self {
237 self.consume_message_batch_max_size = Some(consume_message_batch_max_size);
238 self
239 }
240
241 pub fn pull_batch_size(mut self, pull_batch_size: u32) -> Self {
242 self.pull_batch_size = Some(pull_batch_size);
243 self
244 }
245
246 pub fn pull_batch_size_in_bytes(mut self, pull_batch_size_in_bytes: u32) -> Self {
247 self.pull_batch_size_in_bytes = Some(pull_batch_size_in_bytes);
248 self
249 }
250
251 pub fn post_subscription_when_pull(mut self, post_subscription_when_pull: bool) -> Self {
252 self.post_subscription_when_pull = Some(post_subscription_when_pull);
253 self
254 }
255
256 pub fn unit_mode(mut self, unit_mode: bool) -> Self {
257 self.unit_mode = Some(unit_mode);
258 self
259 }
260
261 pub fn max_reconsume_times(mut self, max_reconsume_times: i32) -> Self {
262 self.max_reconsume_times = Some(max_reconsume_times);
263 self
264 }
265
266 pub fn suspend_current_queue_time_millis(
267 mut self,
268 suspend_current_queue_time_millis: u64,
269 ) -> Self {
270 self.suspend_current_queue_time_millis = Some(suspend_current_queue_time_millis);
271 self
272 }
273
274 pub fn consume_timeout(mut self, consume_timeout: u64) -> Self {
275 self.consume_timeout = Some(consume_timeout);
276 self
277 }
278
279 pub fn pop_invisible_time(mut self, pop_invisible_time: u64) -> Self {
280 self.pop_invisible_time = Some(pop_invisible_time);
281 self
282 }
283
284 pub fn pop_batch_nums(mut self, pop_batch_nums: u32) -> Self {
285 self.pop_batch_nums = Some(pop_batch_nums);
286 self
287 }
288
289 pub fn await_termination_millis_when_shutdown(
290 mut self,
291 await_termination_millis_when_shutdown: u64,
292 ) -> Self {
293 self.await_termination_millis_when_shutdown = Some(await_termination_millis_when_shutdown);
294 self
295 }
296
297 pub fn trace_dispatcher(
298 mut self,
299 trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
300 ) -> Self {
301 self.trace_dispatcher = trace_dispatcher;
302 self
303 }
304
305 pub fn client_rebalance(mut self, client_rebalance: bool) -> Self {
306 self.client_rebalance = Some(client_rebalance);
307 self
308 }
309
310 pub fn rpc_hook(mut self, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) -> Self {
311 self.rpc_hook = rpc_hook;
312 self
313 }
314
315 pub fn build(mut self) -> DefaultMQPushConsumer {
317 let mut consumer_config = ConsumerConfig::default();
318 if let Some(consumer_group) = self.consumer_group {
319 consumer_config.consumer_group = consumer_group;
320 }
321 if let Some(message_model) = self.message_model {
322 consumer_config.message_model = message_model;
323 }
324 if let Some(consume_from_where) = self.consume_from_where {
325 consumer_config.consume_from_where = consume_from_where;
326 }
327
328 consumer_config.consume_timestamp = self.consume_timestamp.take();
329
330 if self.allocate_message_queue_strategy.is_some() {
331 consumer_config.allocate_message_queue_strategy =
332 self.allocate_message_queue_strategy.take();
333 }
334 if let Some(subscription) = self.subscription {
335 consumer_config.subscription = subscription;
336 }
337
338 consumer_config.message_queue_listener = self.message_queue_listener.take();
341
342 if let Some(consume_thread_min) = self.consume_thread_min {
343 consumer_config.consume_thread_min = consume_thread_min;
344 }
345 if let Some(consume_thread_max) = self.consume_thread_max {
346 consumer_config.consume_thread_max = consume_thread_max;
347 }
348 if let Some(adjust_thread_pool_nums_threshold) = self.adjust_thread_pool_nums_threshold {
349 consumer_config.adjust_thread_pool_nums_threshold = adjust_thread_pool_nums_threshold;
350 }
351 if let Some(consume_concurrently_max_span) = self.consume_concurrently_max_span {
352 consumer_config.consume_concurrently_max_span = consume_concurrently_max_span;
353 }
354 if let Some(pull_threshold_for_queue) = self.pull_threshold_for_queue {
355 consumer_config.pull_threshold_for_queue = pull_threshold_for_queue;
356 }
357 if let Some(pop_threshold_for_queue) = self.pop_threshold_for_queue {
358 consumer_config.pop_threshold_for_queue = pop_threshold_for_queue;
359 }
360 if let Some(pull_threshold_size_for_queue) = self.pull_threshold_size_for_queue {
361 consumer_config.pull_threshold_size_for_queue = pull_threshold_size_for_queue;
362 }
363 if let Some(pull_threshold_for_topic) = self.pull_threshold_for_topic {
364 consumer_config.pull_threshold_for_topic = pull_threshold_for_topic;
365 }
366 if let Some(pull_threshold_size_for_topic) = self.pull_threshold_size_for_topic {
367 consumer_config.pull_threshold_size_for_topic = pull_threshold_size_for_topic;
368 }
369 if let Some(pull_interval) = self.pull_interval {
370 consumer_config.pull_interval = pull_interval;
371 }
372 if let Some(consume_message_batch_max_size) = self.consume_message_batch_max_size {
373 consumer_config.consume_message_batch_max_size = consume_message_batch_max_size;
374 }
375 if let Some(pull_batch_size) = self.pull_batch_size {
376 consumer_config.pull_batch_size = pull_batch_size;
377 }
378 if let Some(pull_batch_size_in_bytes) = self.pull_batch_size_in_bytes {
379 consumer_config.pull_batch_size_in_bytes = pull_batch_size_in_bytes;
380 }
381 if let Some(post_subscription_when_pull) = self.post_subscription_when_pull {
382 consumer_config.post_subscription_when_pull = post_subscription_when_pull;
383 }
384 if let Some(unit_mode) = self.unit_mode {
385 consumer_config.unit_mode = unit_mode;
386 }
387 if let Some(max_reconsume_times) = self.max_reconsume_times {
388 consumer_config.max_reconsume_times = max_reconsume_times;
389 }
390 if let Some(suspend_current_queue_time_millis) = self.suspend_current_queue_time_millis {
391 consumer_config.suspend_current_queue_time_millis = suspend_current_queue_time_millis;
392 }
393 if let Some(consume_timeout) = self.consume_timeout {
394 consumer_config.consume_timeout = consume_timeout;
395 }
396 if let Some(pop_invisible_time) = self.pop_invisible_time {
397 consumer_config.pop_invisible_time = pop_invisible_time;
398 }
399 if let Some(pop_batch_nums) = self.pop_batch_nums {
400 consumer_config.pop_batch_nums = pop_batch_nums;
401 }
402 if let Some(await_termination_millis_when_shutdown) =
403 self.await_termination_millis_when_shutdown
404 {
405 consumer_config.await_termination_millis_when_shutdown =
406 await_termination_millis_when_shutdown;
407 }
408 consumer_config.trace_dispatcher = self.trace_dispatcher.clone();
409 if let Some(client_rebalance) = self.client_rebalance {
410 consumer_config.client_rebalance = client_rebalance;
411 }
412 consumer_config.rpc_hook = self.rpc_hook.clone();
413
414 let mut consumer = DefaultMQPushConsumer::new(
415 self.client_config.take().unwrap_or_default(),
416 consumer_config,
417 );
418 if self.topic_sub_expression.0.is_some() && self.topic_sub_expression.1.is_some() {
419 let topic = self.topic_sub_expression.0.take().unwrap();
420 let sub_expression = self.topic_sub_expression.1.take().unwrap();
421 consumer
422 .subscribe(&topic, &sub_expression)
423 .expect("subscribe failed");
424 }
425 consumer
426 }
427}