rocketmq_client_rust/consumer/
default_lite_pull_consumer_builder.rs1use std::sync::Arc;
16
17use cheetah_string::CheetahString;
18use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
19use rocketmq_common::common::message::message_enum::MessageRequestMode;
20use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
21use rocketmq_remoting::runtime::RPCHook;
22use rocketmq_rust::ArcMut;
23
24use crate::base::client_config::ClientConfig;
25use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
26use crate::consumer::consumer_impl::default_lite_pull_consumer_impl::LitePullConsumerConfig;
27use crate::consumer::default_lite_pull_consumer::DefaultLitePullConsumer;
28use crate::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
29use crate::trace::trace_dispatcher::TraceDispatcher;
30
31pub struct DefaultLitePullConsumerBuilder {
46 name_server_addr: Option<CheetahString>,
48 client_ip: Option<CheetahString>,
49 instance_name: Option<CheetahString>,
50 namespace: Option<CheetahString>,
51 access_channel: Option<CheetahString>,
52
53 consumer_group: Option<CheetahString>,
55 message_model: MessageModel,
56 consume_from_where: ConsumeFromWhere,
57 consume_timestamp: Option<CheetahString>,
58 allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy + Send + Sync>,
59
60 pull_batch_size: i32,
62 pull_thread_nums: usize,
63
64 pull_threshold_for_queue: i64,
66 pull_threshold_size_for_queue: i32,
67 pull_threshold_for_all: i64,
68 consume_max_span: i64,
69
70 pull_time_delay_millis_when_exception: u64,
72 pull_time_delay_millis_when_cache_flow_control: u64,
73 pull_time_delay_millis_when_broker_flow_control: u64,
74
75 poll_timeout_millis: u64,
77
78 auto_commit: bool,
80 auto_commit_interval_millis: u64,
81
82 topic_metadata_check_interval_millis: u64,
84 message_request_mode: MessageRequestMode,
85
86 rpc_hook: Option<Arc<dyn RPCHook>>,
88 trace_dispatcher: Option<Arc<dyn TraceDispatcher + Send + Sync>>,
89 enable_msg_trace: bool,
90 custom_trace_topic: Option<CheetahString>,
91}
92
93impl Default for DefaultLitePullConsumerBuilder {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl DefaultLitePullConsumerBuilder {
100 pub fn new() -> Self {
102 Self {
103 name_server_addr: None,
104 client_ip: None,
105 instance_name: None,
106 namespace: None,
107 access_channel: None,
108 consumer_group: None,
109 message_model: MessageModel::Clustering,
110 consume_from_where: ConsumeFromWhere::ConsumeFromLastOffset,
111 consume_timestamp: None,
112 allocate_message_queue_strategy: Arc::new(AllocateMessageQueueAveragely),
113 pull_batch_size: 10,
114 pull_thread_nums: 20,
115 pull_threshold_for_queue: 1000,
116 pull_threshold_size_for_queue: 100,
117 pull_threshold_for_all: -1,
118 consume_max_span: 2000,
119 pull_time_delay_millis_when_exception: 1000,
120 pull_time_delay_millis_when_cache_flow_control: 50,
121 pull_time_delay_millis_when_broker_flow_control: 20,
122 poll_timeout_millis: 5000,
123 auto_commit: true,
124 auto_commit_interval_millis: 5000,
125 topic_metadata_check_interval_millis: 10000,
126 message_request_mode: MessageRequestMode::Pull,
127 rpc_hook: None,
128 trace_dispatcher: None,
129 enable_msg_trace: false,
130 custom_trace_topic: None,
131 }
132 }
133
134 pub fn name_server_addr(mut self, addr: impl Into<CheetahString>) -> Self {
142 self.name_server_addr = Some(addr.into());
143 self
144 }
145
146 pub fn consumer_group(mut self, group: impl Into<CheetahString>) -> Self {
154 self.consumer_group = Some(group.into());
155 self
156 }
157
158 pub fn client_ip(mut self, ip: impl Into<CheetahString>) -> Self {
160 self.client_ip = Some(ip.into());
161 self
162 }
163
164 pub fn instance_name(mut self, name: impl Into<CheetahString>) -> Self {
166 self.instance_name = Some(name.into());
167 self
168 }
169
170 pub fn namespace(mut self, namespace: impl Into<CheetahString>) -> Self {
172 self.namespace = Some(namespace.into());
173 self
174 }
175
176 pub fn message_model(mut self, model: MessageModel) -> Self {
178 self.message_model = model;
179 self
180 }
181
182 pub fn consume_from_where(mut self, consume_from: ConsumeFromWhere) -> Self {
184 self.consume_from_where = consume_from;
185 self
186 }
187
188 pub fn consume_timestamp(mut self, timestamp: impl Into<CheetahString>) -> Self {
190 self.consume_timestamp = Some(timestamp.into());
191 self
192 }
193
194 pub fn allocate_message_queue_strategy(
196 mut self,
197 strategy: Arc<dyn AllocateMessageQueueStrategy + Send + Sync>,
198 ) -> Self {
199 self.allocate_message_queue_strategy = strategy;
200 self
201 }
202
203 pub fn pull_batch_size(mut self, size: i32) -> Self {
205 self.pull_batch_size = size;
206 self
207 }
208
209 pub fn pull_thread_nums(mut self, nums: usize) -> Self {
211 self.pull_thread_nums = nums;
212 self
213 }
214
215 pub fn pull_threshold_for_queue(mut self, threshold: i64) -> Self {
217 self.pull_threshold_for_queue = threshold;
218 self
219 }
220
221 pub fn pull_threshold_size_for_queue(mut self, threshold: i32) -> Self {
223 self.pull_threshold_size_for_queue = threshold;
224 self
225 }
226
227 pub fn pull_threshold_for_all(mut self, threshold: i64) -> Self {
230 self.pull_threshold_for_all = threshold;
231 self
232 }
233
234 pub fn consume_max_span(mut self, span: i64) -> Self {
236 self.consume_max_span = span;
237 self
238 }
239
240 pub fn pull_time_delay_millis_when_exception(mut self, delay: u64) -> Self {
242 self.pull_time_delay_millis_when_exception = delay;
243 self
244 }
245
246 pub fn pull_time_delay_millis_when_cache_flow_control(mut self, delay: u64) -> Self {
248 self.pull_time_delay_millis_when_cache_flow_control = delay;
249 self
250 }
251
252 pub fn pull_time_delay_millis_when_broker_flow_control(mut self, delay: u64) -> Self {
254 self.pull_time_delay_millis_when_broker_flow_control = delay;
255 self
256 }
257
258 pub fn poll_timeout_millis(mut self, timeout: u64) -> Self {
260 self.poll_timeout_millis = timeout;
261 self
262 }
263
264 pub fn auto_commit(mut self, enable: bool) -> Self {
266 self.auto_commit = enable;
267 self
268 }
269
270 pub fn auto_commit_interval_millis(mut self, interval: u64) -> Self {
272 self.auto_commit_interval_millis = interval.max(1000);
273 self
274 }
275
276 pub fn topic_metadata_check_interval_millis(mut self, interval: u64) -> Self {
278 self.topic_metadata_check_interval_millis = interval;
279 self
280 }
281
282 pub fn message_request_mode(mut self, mode: MessageRequestMode) -> Self {
284 self.message_request_mode = mode;
285 self
286 }
287
288 pub fn rpc_hook(mut self, hook: Arc<dyn RPCHook>) -> Self {
290 self.rpc_hook = Some(hook);
291 self
292 }
293
294 pub fn enable_msg_trace(mut self) -> Self {
296 self.enable_msg_trace = true;
297 self
298 }
299
300 pub fn enable_msg_trace_with_topic(mut self, trace_topic: impl Into<CheetahString>) -> Self {
302 self.enable_msg_trace = true;
303 self.custom_trace_topic = Some(trace_topic.into());
304 self
305 }
306
307 pub fn trace_dispatcher(mut self, dispatcher: Arc<dyn TraceDispatcher + Send + Sync>) -> Self {
309 self.trace_dispatcher = Some(dispatcher);
310 self
311 }
312
313 pub fn build(self) -> DefaultLitePullConsumer {
319 let consumer_group = self.consumer_group.expect("consumer_group is required");
320 let name_server_addr = self.name_server_addr.expect("name_server_addr is required");
321
322 let mut client_config = ClientConfig::default();
323 client_config.set_namesrv_addr(name_server_addr);
324
325 if let Some(ip) = self.client_ip {
326 client_config.set_client_ip(ip);
327 }
328
329 if let Some(name) = self.instance_name {
330 client_config.set_instance_name(name);
331 }
332
333 if let Some(namespace) = self.namespace {
334 client_config.set_namespace(namespace);
335 }
336
337 let consumer_config = LitePullConsumerConfig {
338 consumer_group,
339 message_model: self.message_model,
340 consume_from_where: self.consume_from_where,
341 consume_timestamp: self.consume_timestamp,
342 allocate_message_queue_strategy: self.allocate_message_queue_strategy,
343 pull_batch_size: self.pull_batch_size,
344 pull_thread_nums: self.pull_thread_nums,
345 pull_threshold_for_queue: self.pull_threshold_for_queue,
346 pull_threshold_size_for_queue: self.pull_threshold_size_for_queue,
347 pull_threshold_for_all: self.pull_threshold_for_all,
348 consume_max_span: self.consume_max_span,
349 pull_time_delay_millis_when_exception: self.pull_time_delay_millis_when_exception,
350 pull_time_delay_millis_when_cache_flow_control: self.pull_time_delay_millis_when_cache_flow_control,
351 pull_time_delay_millis_when_broker_flow_control: self.pull_time_delay_millis_when_broker_flow_control,
352 poll_timeout_millis: self.poll_timeout_millis,
353 auto_commit: self.auto_commit,
354 auto_commit_interval_millis: self.auto_commit_interval_millis,
355 topic_metadata_check_interval_millis: self.topic_metadata_check_interval_millis,
356 message_request_mode: self.message_request_mode,
357 };
358
359 DefaultLitePullConsumer::new(
360 ArcMut::new(client_config),
361 ArcMut::new(consumer_config),
362 self.rpc_hook,
363 self.trace_dispatcher,
364 self.enable_msg_trace,
365 self.custom_trace_topic,
366 )
367 }
368}