Skip to main content

rocketmq_client_rust/consumer/
default_lite_pull_consumer_builder.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use cheetah_string::CheetahString;
18use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
19use rocketmq_common::common::message::message_enum::MessageRequestMode;
20use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
21use rocketmq_remoting::runtime::RPCHook;
22use rocketmq_rust::ArcMut;
23
24use crate::base::client_config::ClientConfig;
25use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
26use crate::consumer::consumer_impl::default_lite_pull_consumer_impl::LitePullConsumerConfig;
27use crate::consumer::default_lite_pull_consumer::DefaultLitePullConsumer;
28use crate::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
29use crate::trace::trace_dispatcher::TraceDispatcher;
30
31/// Builder for creating a [`DefaultLitePullConsumer`] with customized configuration.
32///
33/// # Examples
34///
35/// ```rust,ignore
36/// use rocketmq_client::consumer::default_lite_pull_consumer::DefaultLitePullConsumer;
37///
38/// let consumer = DefaultLitePullConsumer::builder()
39///     .consumer_group("my_consumer_group")
40///     .name_server_addr("127.0.0.1:9876")
41///     .pull_batch_size(32)
42///     .auto_commit(true)
43///     .build();
44/// ```
45pub struct DefaultLitePullConsumerBuilder {
46    // Client configuration
47    name_server_addr: Option<CheetahString>,
48    client_ip: Option<CheetahString>,
49    instance_name: Option<CheetahString>,
50    namespace: Option<CheetahString>,
51    access_channel: Option<CheetahString>,
52
53    // Consumer configuration
54    consumer_group: Option<CheetahString>,
55    message_model: MessageModel,
56    consume_from_where: ConsumeFromWhere,
57    consume_timestamp: Option<CheetahString>,
58    allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy + Send + Sync>,
59
60    // Pull configuration
61    pull_batch_size: i32,
62    pull_thread_nums: usize,
63
64    // Flow control
65    pull_threshold_for_queue: i64,
66    pull_threshold_size_for_queue: i32,
67    pull_threshold_for_all: i64,
68    consume_max_span: i64,
69
70    // Backoff delays
71    pull_time_delay_millis_when_exception: u64,
72    pull_time_delay_millis_when_cache_flow_control: u64,
73    pull_time_delay_millis_when_broker_flow_control: u64,
74
75    // Poll configuration
76    poll_timeout_millis: u64,
77
78    // Auto-commit
79    auto_commit: bool,
80    auto_commit_interval_millis: u64,
81
82    // Miscellaneous
83    topic_metadata_check_interval_millis: u64,
84    message_request_mode: MessageRequestMode,
85
86    // Advanced
87    rpc_hook: Option<Arc<dyn RPCHook>>,
88    trace_dispatcher: Option<Arc<dyn TraceDispatcher + Send + Sync>>,
89    enable_msg_trace: bool,
90    custom_trace_topic: Option<CheetahString>,
91}
92
93impl Default for DefaultLitePullConsumerBuilder {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl DefaultLitePullConsumerBuilder {
100    /// Creates a new builder with default configuration.
101    pub fn new() -> Self {
102        Self {
103            name_server_addr: None,
104            client_ip: None,
105            instance_name: None,
106            namespace: None,
107            access_channel: None,
108            consumer_group: None,
109            message_model: MessageModel::Clustering,
110            consume_from_where: ConsumeFromWhere::ConsumeFromLastOffset,
111            consume_timestamp: None,
112            allocate_message_queue_strategy: Arc::new(AllocateMessageQueueAveragely),
113            pull_batch_size: 10,
114            pull_thread_nums: 20,
115            pull_threshold_for_queue: 1000,
116            pull_threshold_size_for_queue: 100,
117            pull_threshold_for_all: -1,
118            consume_max_span: 2000,
119            pull_time_delay_millis_when_exception: 1000,
120            pull_time_delay_millis_when_cache_flow_control: 50,
121            pull_time_delay_millis_when_broker_flow_control: 20,
122            poll_timeout_millis: 5000,
123            auto_commit: true,
124            auto_commit_interval_millis: 5000,
125            topic_metadata_check_interval_millis: 10000,
126            message_request_mode: MessageRequestMode::Pull,
127            rpc_hook: None,
128            trace_dispatcher: None,
129            enable_msg_trace: false,
130            custom_trace_topic: None,
131        }
132    }
133
134    /// Sets the name server address (required).
135    ///
136    /// # Examples
137    ///
138    /// ```rust,ignore
139    /// builder.name_server_addr("127.0.0.1:9876");
140    /// ```
141    pub fn name_server_addr(mut self, addr: impl Into<CheetahString>) -> Self {
142        self.name_server_addr = Some(addr.into());
143        self
144    }
145
146    /// Sets the consumer group name (required).
147    ///
148    /// # Examples
149    ///
150    /// ```rust,ignore
151    /// builder.consumer_group("my_consumer_group");
152    /// ```
153    pub fn consumer_group(mut self, group: impl Into<CheetahString>) -> Self {
154        self.consumer_group = Some(group.into());
155        self
156    }
157
158    /// Sets the client IP address (optional, auto-detected by default).
159    pub fn client_ip(mut self, ip: impl Into<CheetahString>) -> Self {
160        self.client_ip = Some(ip.into());
161        self
162    }
163
164    /// Sets the instance name (optional, auto-generated by default).
165    pub fn instance_name(mut self, name: impl Into<CheetahString>) -> Self {
166        self.instance_name = Some(name.into());
167        self
168    }
169
170    /// Sets the namespace (optional).
171    pub fn namespace(mut self, namespace: impl Into<CheetahString>) -> Self {
172        self.namespace = Some(namespace.into());
173        self
174    }
175
176    /// Sets the message model (default: Clustering).
177    pub fn message_model(mut self, model: MessageModel) -> Self {
178        self.message_model = model;
179        self
180    }
181
182    /// Sets where to start consuming from when no offset exists (default: LastOffset).
183    pub fn consume_from_where(mut self, consume_from: ConsumeFromWhere) -> Self {
184        self.consume_from_where = consume_from;
185        self
186    }
187
188    /// Sets the timestamp to consume from (for CONSUME_FROM_TIMESTAMP mode).
189    pub fn consume_timestamp(mut self, timestamp: impl Into<CheetahString>) -> Self {
190        self.consume_timestamp = Some(timestamp.into());
191        self
192    }
193
194    /// Sets the message queue allocation strategy.
195    pub fn allocate_message_queue_strategy(
196        mut self,
197        strategy: Arc<dyn AllocateMessageQueueStrategy + Send + Sync>,
198    ) -> Self {
199        self.allocate_message_queue_strategy = strategy;
200        self
201    }
202
203    /// Sets the number of messages to pull in a single request (default: 10, range: 1-1024).
204    pub fn pull_batch_size(mut self, size: i32) -> Self {
205        self.pull_batch_size = size;
206        self
207    }
208
209    /// Sets the number of concurrent pull threads (default: 20).
210    pub fn pull_thread_nums(mut self, nums: usize) -> Self {
211        self.pull_thread_nums = nums;
212        self
213    }
214
215    /// Sets the maximum number of messages cached per queue (default: 1000).
216    pub fn pull_threshold_for_queue(mut self, threshold: i64) -> Self {
217        self.pull_threshold_for_queue = threshold;
218        self
219    }
220
221    /// Sets the maximum size in MiB of messages cached per queue (default: 100).
222    pub fn pull_threshold_size_for_queue(mut self, threshold: i32) -> Self {
223        self.pull_threshold_size_for_queue = threshold;
224        self
225    }
226
227    /// Sets the maximum total number of cached messages across all queues (default: -1 for
228    /// unlimited).
229    pub fn pull_threshold_for_all(mut self, threshold: i64) -> Self {
230        self.pull_threshold_for_all = threshold;
231        self
232    }
233
234    /// Sets the maximum offset span allowed in a process queue (default: 2000).
235    pub fn consume_max_span(mut self, span: i64) -> Self {
236        self.consume_max_span = span;
237        self
238    }
239
240    /// Sets the delay when pull encounters an exception (default: 1000ms).
241    pub fn pull_time_delay_millis_when_exception(mut self, delay: u64) -> Self {
242        self.pull_time_delay_millis_when_exception = delay;
243        self
244    }
245
246    /// Sets the delay when cache flow control is triggered (default: 50ms).
247    pub fn pull_time_delay_millis_when_cache_flow_control(mut self, delay: u64) -> Self {
248        self.pull_time_delay_millis_when_cache_flow_control = delay;
249        self
250    }
251
252    /// Sets the delay when broker flow control is triggered (default: 20ms).
253    pub fn pull_time_delay_millis_when_broker_flow_control(mut self, delay: u64) -> Self {
254        self.pull_time_delay_millis_when_broker_flow_control = delay;
255        self
256    }
257
258    /// Sets the default poll timeout in milliseconds (default: 5000).
259    pub fn poll_timeout_millis(mut self, timeout: u64) -> Self {
260        self.poll_timeout_millis = timeout;
261        self
262    }
263
264    /// Sets whether to automatically commit offsets (default: true).
265    pub fn auto_commit(mut self, enable: bool) -> Self {
266        self.auto_commit = enable;
267        self
268    }
269
270    /// Sets the interval between automatic offset commits (default: 5000ms, minimum: 1000ms).
271    pub fn auto_commit_interval_millis(mut self, interval: u64) -> Self {
272        self.auto_commit_interval_millis = interval.max(1000);
273        self
274    }
275
276    /// Sets the interval for checking topic metadata changes (default: 10000ms).
277    pub fn topic_metadata_check_interval_millis(mut self, interval: u64) -> Self {
278        self.topic_metadata_check_interval_millis = interval;
279        self
280    }
281
282    /// Sets the message request mode (default: Pull).
283    pub fn message_request_mode(mut self, mode: MessageRequestMode) -> Self {
284        self.message_request_mode = mode;
285        self
286    }
287
288    /// Sets the RPC hook for request/response interception.
289    pub fn rpc_hook(mut self, hook: Arc<dyn RPCHook>) -> Self {
290        self.rpc_hook = Some(hook);
291        self
292    }
293
294    /// Enables message trace with the default trace topic.
295    pub fn enable_msg_trace(mut self) -> Self {
296        self.enable_msg_trace = true;
297        self
298    }
299
300    /// Enables message trace with a custom trace topic.
301    pub fn enable_msg_trace_with_topic(mut self, trace_topic: impl Into<CheetahString>) -> Self {
302        self.enable_msg_trace = true;
303        self.custom_trace_topic = Some(trace_topic.into());
304        self
305    }
306
307    /// Sets a custom trace dispatcher.
308    pub fn trace_dispatcher(mut self, dispatcher: Arc<dyn TraceDispatcher + Send + Sync>) -> Self {
309        self.trace_dispatcher = Some(dispatcher);
310        self
311    }
312
313    /// Builds the [`DefaultLitePullConsumer`].
314    ///
315    /// # Panics
316    ///
317    /// Panics if required fields (consumer_group, name_server_addr) are not set.
318    pub fn build(self) -> DefaultLitePullConsumer {
319        let consumer_group = self.consumer_group.expect("consumer_group is required");
320        let name_server_addr = self.name_server_addr.expect("name_server_addr is required");
321
322        let mut client_config = ClientConfig::default();
323        client_config.set_namesrv_addr(name_server_addr);
324
325        if let Some(ip) = self.client_ip {
326            client_config.set_client_ip(ip);
327        }
328
329        if let Some(name) = self.instance_name {
330            client_config.set_instance_name(name);
331        }
332
333        if let Some(namespace) = self.namespace {
334            client_config.set_namespace(namespace);
335        }
336
337        let consumer_config = LitePullConsumerConfig {
338            consumer_group,
339            message_model: self.message_model,
340            consume_from_where: self.consume_from_where,
341            consume_timestamp: self.consume_timestamp,
342            allocate_message_queue_strategy: self.allocate_message_queue_strategy,
343            pull_batch_size: self.pull_batch_size,
344            pull_thread_nums: self.pull_thread_nums,
345            pull_threshold_for_queue: self.pull_threshold_for_queue,
346            pull_threshold_size_for_queue: self.pull_threshold_size_for_queue,
347            pull_threshold_for_all: self.pull_threshold_for_all,
348            consume_max_span: self.consume_max_span,
349            pull_time_delay_millis_when_exception: self.pull_time_delay_millis_when_exception,
350            pull_time_delay_millis_when_cache_flow_control: self.pull_time_delay_millis_when_cache_flow_control,
351            pull_time_delay_millis_when_broker_flow_control: self.pull_time_delay_millis_when_broker_flow_control,
352            poll_timeout_millis: self.poll_timeout_millis,
353            auto_commit: self.auto_commit,
354            auto_commit_interval_millis: self.auto_commit_interval_millis,
355            topic_metadata_check_interval_millis: self.topic_metadata_check_interval_millis,
356            message_request_mode: self.message_request_mode,
357        };
358
359        DefaultLitePullConsumer::new(
360            ArcMut::new(client_config),
361            ArcMut::new(consumer_config),
362            self.rpc_hook,
363            self.trace_dispatcher,
364            self.enable_msg_trace,
365            self.custom_trace_topic,
366        )
367    }
368}