rocketmq_client_rust/consumer/
default_mq_push_consumer_builder.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use cheetah_string::CheetahString;
21use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
22use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
23use rocketmq_remoting::runtime::RPCHook;
24use rocketmq_rust::ArcMut;
25
26use crate::base::client_config::ClientConfig;
27use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
28use crate::consumer::default_mq_push_consumer::ConsumerConfig;
29use crate::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
30use crate::consumer::message_queue_listener::MessageQueueListener;
31use crate::consumer::mq_push_consumer::MQPushConsumer;
32use crate::trace::trace_dispatcher::TraceDispatcher;
33
34pub struct DefaultMQPushConsumerBuilder {
35    client_config: Option<ClientConfig>,
36    consumer_group: Option<CheetahString>,
37    topic_sub_expression: (Option<CheetahString>, Option<CheetahString>),
38    message_model: Option<MessageModel>,
39    consume_from_where: Option<ConsumeFromWhere>,
40    consume_timestamp: Option<CheetahString>,
41    allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
42    subscription: Option<ArcMut<HashMap<CheetahString, CheetahString>>>,
43
44    message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
45
46    consume_thread_min: Option<u32>,
47    consume_thread_max: Option<u32>,
48    adjust_thread_pool_nums_threshold: Option<u64>,
49    consume_concurrently_max_span: Option<u32>,
50    pull_threshold_for_queue: Option<u32>,
51    pop_threshold_for_queue: Option<u32>,
52    pull_threshold_size_for_queue: Option<u32>,
53    pull_threshold_for_topic: Option<i32>,
54    pull_threshold_size_for_topic: Option<i32>,
55    pull_interval: Option<u64>,
56    consume_message_batch_max_size: Option<u32>,
57    pull_batch_size: Option<u32>,
58    pull_batch_size_in_bytes: Option<u32>,
59    post_subscription_when_pull: Option<bool>,
60    unit_mode: Option<bool>,
61    max_reconsume_times: Option<i32>,
62    suspend_current_queue_time_millis: Option<u64>,
63    consume_timeout: Option<u64>,
64    pop_invisible_time: Option<u64>,
65    pop_batch_nums: Option<u32>,
66    await_termination_millis_when_shutdown: Option<u64>,
67    trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
68    client_rebalance: Option<bool>,
69    rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
70}
71
72impl Default for DefaultMQPushConsumerBuilder {
73    fn default() -> Self {
74        Self {
75            client_config: Some(Default::default()),
76            consumer_group: None,
77            topic_sub_expression: (None, None),
78            message_model: None,
79            consume_from_where: None,
80            consume_timestamp: None,
81            allocate_message_queue_strategy: None,
82            subscription: None,
83            message_queue_listener: None,
84            consume_thread_min: None,
85            consume_thread_max: None,
86            adjust_thread_pool_nums_threshold: None,
87            consume_concurrently_max_span: None,
88            pull_threshold_for_queue: None,
89            pop_threshold_for_queue: None,
90            pull_threshold_size_for_queue: None,
91            pull_threshold_for_topic: None,
92            pull_threshold_size_for_topic: None,
93            pull_interval: None,
94            consume_message_batch_max_size: None,
95            pull_batch_size: None,
96            pull_batch_size_in_bytes: None,
97            post_subscription_when_pull: None,
98            unit_mode: None,
99            max_reconsume_times: None,
100            suspend_current_queue_time_millis: None,
101            consume_timeout: None,
102            pop_invisible_time: None,
103            pop_batch_nums: None,
104            await_termination_millis_when_shutdown: None,
105            trace_dispatcher: None,
106            client_rebalance: None,
107            rpc_hook: None,
108        }
109    }
110}
111
112impl DefaultMQPushConsumerBuilder {
113    pub fn name_server_addr(mut self, name_server_addr: impl Into<CheetahString>) -> Self {
114        if let Some(client_config) = self.client_config.as_mut() {
115            client_config.namesrv_addr = Some(name_server_addr.into());
116            client_config
117                .namespace_initialized
118                .store(false, std::sync::atomic::Ordering::Release);
119        }
120        self
121    }
122
123    pub fn client_config(mut self, client_config: ClientConfig) -> Self {
124        self.client_config = Some(client_config);
125        self
126    }
127
128    // Methods to set each field
129    pub fn consumer_group(mut self, consumer_group: impl Into<CheetahString>) -> Self {
130        self.consumer_group = Some(consumer_group.into());
131        self
132    }
133
134    pub fn message_model(mut self, message_model: MessageModel) -> Self {
135        self.message_model = Some(message_model);
136        self
137    }
138
139    pub fn consume_from_where(mut self, consume_from_where: ConsumeFromWhere) -> Self {
140        self.consume_from_where = Some(consume_from_where);
141        self
142    }
143
144    pub fn consume_timestamp(mut self, consume_timestamp: impl Into<CheetahString>) -> Self {
145        self.consume_timestamp = Some(consume_timestamp.into());
146        self
147    }
148
149    pub fn allocate_message_queue_strategy(
150        mut self,
151        allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy>,
152    ) -> Self {
153        self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
154        self
155    }
156
157    pub fn subscribe(
158        mut self,
159        topic: impl Into<CheetahString>,
160        sub_expression: impl Into<CheetahString>,
161    ) -> Self {
162        self.topic_sub_expression.0 = Some(topic.into());
163        self.topic_sub_expression.1 = Some(sub_expression.into());
164        self
165    }
166
167    /*    pub fn message_listener(
168        mut self,
169        message_listener: Option<Arc<Box<dyn MessageListener>>>,
170    ) -> Self {
171        self.message_listener = message_listener;
172        self
173    }*/
174
175    pub fn message_queue_listener(
176        mut self,
177        message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
178    ) -> Self {
179        self.message_queue_listener = message_queue_listener;
180        self
181    }
182
183    pub fn consume_thread_min(mut self, consume_thread_min: u32) -> Self {
184        self.consume_thread_min = Some(consume_thread_min);
185        self
186    }
187
188    pub fn consume_thread_max(mut self, consume_thread_max: u32) -> Self {
189        self.consume_thread_max = Some(consume_thread_max);
190        self
191    }
192
193    pub fn adjust_thread_pool_nums_threshold(
194        mut self,
195        adjust_thread_pool_nums_threshold: u64,
196    ) -> Self {
197        self.adjust_thread_pool_nums_threshold = Some(adjust_thread_pool_nums_threshold);
198        self
199    }
200
201    pub fn consume_concurrently_max_span(mut self, consume_concurrently_max_span: u32) -> Self {
202        self.consume_concurrently_max_span = Some(consume_concurrently_max_span);
203        self
204    }
205
206    pub fn pull_threshold_for_queue(mut self, pull_threshold_for_queue: u32) -> Self {
207        self.pull_threshold_for_queue = Some(pull_threshold_for_queue);
208        self
209    }
210
211    pub fn pop_threshold_for_queue(mut self, pop_threshold_for_queue: u32) -> Self {
212        self.pop_threshold_for_queue = Some(pop_threshold_for_queue);
213        self
214    }
215
216    pub fn pull_threshold_size_for_queue(mut self, pull_threshold_size_for_queue: u32) -> Self {
217        self.pull_threshold_size_for_queue = Some(pull_threshold_size_for_queue);
218        self
219    }
220
221    pub fn pull_threshold_for_topic(mut self, pull_threshold_for_topic: i32) -> Self {
222        self.pull_threshold_for_topic = Some(pull_threshold_for_topic);
223        self
224    }
225
226    pub fn pull_threshold_size_for_topic(mut self, pull_threshold_size_for_topic: i32) -> Self {
227        self.pull_threshold_size_for_topic = Some(pull_threshold_size_for_topic);
228        self
229    }
230
231    pub fn pull_interval(mut self, pull_interval: u64) -> Self {
232        self.pull_interval = Some(pull_interval);
233        self
234    }
235
236    pub fn consume_message_batch_max_size(mut self, consume_message_batch_max_size: u32) -> Self {
237        self.consume_message_batch_max_size = Some(consume_message_batch_max_size);
238        self
239    }
240
241    pub fn pull_batch_size(mut self, pull_batch_size: u32) -> Self {
242        self.pull_batch_size = Some(pull_batch_size);
243        self
244    }
245
246    pub fn pull_batch_size_in_bytes(mut self, pull_batch_size_in_bytes: u32) -> Self {
247        self.pull_batch_size_in_bytes = Some(pull_batch_size_in_bytes);
248        self
249    }
250
251    pub fn post_subscription_when_pull(mut self, post_subscription_when_pull: bool) -> Self {
252        self.post_subscription_when_pull = Some(post_subscription_when_pull);
253        self
254    }
255
256    pub fn unit_mode(mut self, unit_mode: bool) -> Self {
257        self.unit_mode = Some(unit_mode);
258        self
259    }
260
261    pub fn max_reconsume_times(mut self, max_reconsume_times: i32) -> Self {
262        self.max_reconsume_times = Some(max_reconsume_times);
263        self
264    }
265
266    pub fn suspend_current_queue_time_millis(
267        mut self,
268        suspend_current_queue_time_millis: u64,
269    ) -> Self {
270        self.suspend_current_queue_time_millis = Some(suspend_current_queue_time_millis);
271        self
272    }
273
274    pub fn consume_timeout(mut self, consume_timeout: u64) -> Self {
275        self.consume_timeout = Some(consume_timeout);
276        self
277    }
278
279    pub fn pop_invisible_time(mut self, pop_invisible_time: u64) -> Self {
280        self.pop_invisible_time = Some(pop_invisible_time);
281        self
282    }
283
284    pub fn pop_batch_nums(mut self, pop_batch_nums: u32) -> Self {
285        self.pop_batch_nums = Some(pop_batch_nums);
286        self
287    }
288
289    pub fn await_termination_millis_when_shutdown(
290        mut self,
291        await_termination_millis_when_shutdown: u64,
292    ) -> Self {
293        self.await_termination_millis_when_shutdown = Some(await_termination_millis_when_shutdown);
294        self
295    }
296
297    pub fn trace_dispatcher(
298        mut self,
299        trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
300    ) -> Self {
301        self.trace_dispatcher = trace_dispatcher;
302        self
303    }
304
305    pub fn client_rebalance(mut self, client_rebalance: bool) -> Self {
306        self.client_rebalance = Some(client_rebalance);
307        self
308    }
309
310    pub fn rpc_hook(mut self, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) -> Self {
311        self.rpc_hook = rpc_hook;
312        self
313    }
314
315    // Build method to create a ConsumerConfig instance
316    pub fn build(mut self) -> DefaultMQPushConsumer {
317        let mut consumer_config = ConsumerConfig::default();
318        if let Some(consumer_group) = self.consumer_group {
319            consumer_config.consumer_group = consumer_group;
320        }
321        if let Some(message_model) = self.message_model {
322            consumer_config.message_model = message_model;
323        }
324        if let Some(consume_from_where) = self.consume_from_where {
325            consumer_config.consume_from_where = consume_from_where;
326        }
327
328        consumer_config.consume_timestamp = self.consume_timestamp.take();
329
330        if self.allocate_message_queue_strategy.is_some() {
331            consumer_config.allocate_message_queue_strategy =
332                self.allocate_message_queue_strategy.take();
333        }
334        if let Some(subscription) = self.subscription {
335            consumer_config.subscription = subscription;
336        }
337
338        //consumer_config.message_listener = self.message_listener.take();
339
340        consumer_config.message_queue_listener = self.message_queue_listener.take();
341
342        if let Some(consume_thread_min) = self.consume_thread_min {
343            consumer_config.consume_thread_min = consume_thread_min;
344        }
345        if let Some(consume_thread_max) = self.consume_thread_max {
346            consumer_config.consume_thread_max = consume_thread_max;
347        }
348        if let Some(adjust_thread_pool_nums_threshold) = self.adjust_thread_pool_nums_threshold {
349            consumer_config.adjust_thread_pool_nums_threshold = adjust_thread_pool_nums_threshold;
350        }
351        if let Some(consume_concurrently_max_span) = self.consume_concurrently_max_span {
352            consumer_config.consume_concurrently_max_span = consume_concurrently_max_span;
353        }
354        if let Some(pull_threshold_for_queue) = self.pull_threshold_for_queue {
355            consumer_config.pull_threshold_for_queue = pull_threshold_for_queue;
356        }
357        if let Some(pop_threshold_for_queue) = self.pop_threshold_for_queue {
358            consumer_config.pop_threshold_for_queue = pop_threshold_for_queue;
359        }
360        if let Some(pull_threshold_size_for_queue) = self.pull_threshold_size_for_queue {
361            consumer_config.pull_threshold_size_for_queue = pull_threshold_size_for_queue;
362        }
363        if let Some(pull_threshold_for_topic) = self.pull_threshold_for_topic {
364            consumer_config.pull_threshold_for_topic = pull_threshold_for_topic;
365        }
366        if let Some(pull_threshold_size_for_topic) = self.pull_threshold_size_for_topic {
367            consumer_config.pull_threshold_size_for_topic = pull_threshold_size_for_topic;
368        }
369        if let Some(pull_interval) = self.pull_interval {
370            consumer_config.pull_interval = pull_interval;
371        }
372        if let Some(consume_message_batch_max_size) = self.consume_message_batch_max_size {
373            consumer_config.consume_message_batch_max_size = consume_message_batch_max_size;
374        }
375        if let Some(pull_batch_size) = self.pull_batch_size {
376            consumer_config.pull_batch_size = pull_batch_size;
377        }
378        if let Some(pull_batch_size_in_bytes) = self.pull_batch_size_in_bytes {
379            consumer_config.pull_batch_size_in_bytes = pull_batch_size_in_bytes;
380        }
381        if let Some(post_subscription_when_pull) = self.post_subscription_when_pull {
382            consumer_config.post_subscription_when_pull = post_subscription_when_pull;
383        }
384        if let Some(unit_mode) = self.unit_mode {
385            consumer_config.unit_mode = unit_mode;
386        }
387        if let Some(max_reconsume_times) = self.max_reconsume_times {
388            consumer_config.max_reconsume_times = max_reconsume_times;
389        }
390        if let Some(suspend_current_queue_time_millis) = self.suspend_current_queue_time_millis {
391            consumer_config.suspend_current_queue_time_millis = suspend_current_queue_time_millis;
392        }
393        if let Some(consume_timeout) = self.consume_timeout {
394            consumer_config.consume_timeout = consume_timeout;
395        }
396        if let Some(pop_invisible_time) = self.pop_invisible_time {
397            consumer_config.pop_invisible_time = pop_invisible_time;
398        }
399        if let Some(pop_batch_nums) = self.pop_batch_nums {
400            consumer_config.pop_batch_nums = pop_batch_nums;
401        }
402        if let Some(await_termination_millis_when_shutdown) =
403            self.await_termination_millis_when_shutdown
404        {
405            consumer_config.await_termination_millis_when_shutdown =
406                await_termination_millis_when_shutdown;
407        }
408        consumer_config.trace_dispatcher = self.trace_dispatcher.clone();
409        if let Some(client_rebalance) = self.client_rebalance {
410            consumer_config.client_rebalance = client_rebalance;
411        }
412        consumer_config.rpc_hook = self.rpc_hook.clone();
413
414        let mut consumer = DefaultMQPushConsumer::new(
415            self.client_config.take().unwrap_or_default(),
416            consumer_config,
417        );
418        if self.topic_sub_expression.0.is_some() && self.topic_sub_expression.1.is_some() {
419            let topic = self.topic_sub_expression.0.take().unwrap();
420            let sub_expression = self.topic_sub_expression.1.take().unwrap();
421            consumer
422                .subscribe(&topic, &sub_expression)
423                .expect("subscribe failed");
424        }
425        consumer
426    }
427}