rocketmq_client_rust/consumer/
default_mq_push_consumer.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use cheetah_string::CheetahString;
19use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
20use rocketmq_common::common::message::message_ext::MessageExt;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22use rocketmq_common::utils::util_all;
23use rocketmq_common::TimeUtils::current_millis;
24use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
25use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
26use rocketmq_remoting::runtime::RPCHook;
27use rocketmq_rust::ArcMut;
28
29use crate::base::client_config::ClientConfig;
30use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
31use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
32use crate::consumer::default_mq_push_consumer_builder::DefaultMQPushConsumerBuilder;
33use crate::consumer::listener::message_listener::MessageListener;
34use crate::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
35use crate::consumer::listener::message_listener_orderly::MessageListenerOrderly;
36use crate::consumer::message_queue_listener::ArcMessageQueueListener;
37use crate::consumer::message_selector::MessageSelector;
38use crate::consumer::mq_consumer::MQConsumer;
39use crate::consumer::mq_push_consumer::MQPushConsumer;
40use crate::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
41use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
42use crate::trace::hook::consume_message_trace_hook_impl::ConsumeMessageTraceHookImpl;
43use crate::trace::trace_dispatcher::ArcTraceDispatcher;
44use crate::trace::trace_dispatcher::Type;
45
46#[derive(Clone)]
47pub struct ConsumerConfig {
48 pub(crate) consumer_group: CheetahString,
49 pub(crate) topic: CheetahString,
50 pub(crate) sub_expression: CheetahString,
51 pub(crate) message_model: MessageModel,
52 pub(crate) consume_from_where: ConsumeFromWhere,
53 pub(crate) consume_timestamp: Option<CheetahString>,
54 pub(crate) allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
55 pub(crate) subscription: ArcMut<HashMap<CheetahString, CheetahString>>,
57 pub(crate) message_listener: Option<ArcMut<MessageListener>>,
58 pub(crate) message_queue_listener: Option<ArcMessageQueueListener>,
59 pub(crate) consume_thread_min: u32,
60 pub(crate) consume_thread_max: u32,
61 pub(crate) adjust_thread_pool_nums_threshold: u64,
62 pub(crate) consume_concurrently_max_span: u32,
63 pub(crate) pull_threshold_for_queue: u32,
64 pub(crate) pop_threshold_for_queue: u32,
65 pub(crate) pull_threshold_size_for_queue: u32,
66 pub(crate) pull_threshold_for_topic: i32,
67 pub(crate) pull_threshold_size_for_topic: i32,
68 pub(crate) pull_interval: u64,
69 pub(crate) consume_message_batch_max_size: u32,
70 pub(crate) pull_batch_size: u32,
71 pub(crate) pull_batch_size_in_bytes: u32,
72 pub(crate) post_subscription_when_pull: bool,
73 pub(crate) unit_mode: bool,
74 pub(crate) max_reconsume_times: i32,
75 pub(crate) suspend_current_queue_time_millis: u64,
76 pub(crate) consume_timeout: u64,
77 pub(crate) pop_invisible_time: u64,
78 pub(crate) pop_batch_nums: u32,
79 pub(crate) await_termination_millis_when_shutdown: u64,
80 pub(crate) trace_dispatcher: Option<ArcTraceDispatcher>,
81 pub(crate) client_rebalance: bool,
82 pub(crate) rpc_hook: Option<Arc<dyn RPCHook>>,
83}
84
85impl ConsumerConfig {
86 pub fn consumer_group(&self) -> &CheetahString {
87 &self.consumer_group
88 }
89
90 pub fn message_model(&self) -> MessageModel {
91 self.message_model
92 }
93
94 pub fn consume_from_where(&self) -> ConsumeFromWhere {
95 self.consume_from_where
96 }
97
98 pub fn consume_timestamp(&self) -> &Option<CheetahString> {
99 &self.consume_timestamp
100 }
101
102 pub fn allocate_message_queue_strategy(&self) -> Option<Arc<dyn AllocateMessageQueueStrategy>> {
103 self.allocate_message_queue_strategy.clone()
104 }
105
106 pub fn subscription(&self) -> &ArcMut<HashMap<CheetahString, CheetahString>> {
107 &self.subscription
108 }
109
110 pub fn consume_thread_min(&self) -> u32 {
119 self.consume_thread_min
120 }
121
122 pub fn consume_thread_max(&self) -> u32 {
123 self.consume_thread_max
124 }
125
126 pub fn adjust_thread_pool_nums_threshold(&self) -> u64 {
127 self.adjust_thread_pool_nums_threshold
128 }
129
130 pub fn consume_concurrently_max_span(&self) -> u32 {
131 self.consume_concurrently_max_span
132 }
133
134 pub fn pull_threshold_for_queue(&self) -> u32 {
135 self.pull_threshold_for_queue
136 }
137
138 pub fn pop_threshold_for_queue(&self) -> u32 {
139 self.pop_threshold_for_queue
140 }
141
142 pub fn pull_threshold_size_for_queue(&self) -> u32 {
143 self.pull_threshold_size_for_queue
144 }
145
146 pub fn pull_threshold_for_topic(&self) -> i32 {
147 self.pull_threshold_for_topic
148 }
149
150 pub fn pull_threshold_size_for_topic(&self) -> i32 {
151 self.pull_threshold_size_for_topic
152 }
153
154 pub fn pull_interval(&self) -> u64 {
155 self.pull_interval
156 }
157
158 pub fn consume_message_batch_max_size(&self) -> u32 {
159 self.consume_message_batch_max_size
160 }
161
162 pub fn pull_batch_size(&self) -> u32 {
163 self.pull_batch_size
164 }
165
166 pub fn pull_batch_size_in_bytes(&self) -> u32 {
167 self.pull_batch_size_in_bytes
168 }
169
170 pub fn post_subscription_when_pull(&self) -> bool {
171 self.post_subscription_when_pull
172 }
173
174 pub fn unit_mode(&self) -> bool {
175 self.unit_mode
176 }
177
178 pub fn max_reconsume_times(&self) -> i32 {
179 self.max_reconsume_times
180 }
181
182 pub fn suspend_current_queue_time_millis(&self) -> u64 {
183 self.suspend_current_queue_time_millis
184 }
185
186 pub fn consume_timeout(&self) -> u64 {
187 self.consume_timeout
188 }
189
190 pub fn pop_invisible_time(&self) -> u64 {
191 self.pop_invisible_time
192 }
193
194 pub fn pop_batch_nums(&self) -> u32 {
195 self.pop_batch_nums
196 }
197
198 pub fn await_termination_millis_when_shutdown(&self) -> u64 {
199 self.await_termination_millis_when_shutdown
200 }
201
202 pub fn trace_dispatcher(&self) -> &Option<ArcTraceDispatcher> {
203 &self.trace_dispatcher
204 }
205
206 pub fn client_rebalance(&self) -> bool {
207 self.client_rebalance
208 }
209
210 pub fn rpc_hook(&self) -> &Option<Arc<dyn RPCHook>> {
211 &self.rpc_hook
212 }
213
214 pub fn set_consumer_group(&mut self, consumer_group: CheetahString) {
215 self.consumer_group = consumer_group;
216 }
217
218 pub fn set_message_model(&mut self, message_model: MessageModel) {
219 self.message_model = message_model;
220 }
221
222 pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
223 self.consume_from_where = consume_from_where;
224 }
225
226 pub fn set_consume_timestamp(&mut self, consume_timestamp: Option<CheetahString>) {
227 self.consume_timestamp = consume_timestamp;
228 }
229
230 pub fn set_allocate_message_queue_strategy(
231 &mut self,
232 allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy>,
233 ) {
234 self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
235 }
236
237 pub fn set_subscription(&mut self, subscription: ArcMut<HashMap<CheetahString, CheetahString>>) {
242 self.subscription = subscription;
243 }
244
245 pub fn set_message_queue_listener(&mut self, message_queue_listener: Option<ArcMessageQueueListener>) {
253 self.message_queue_listener = message_queue_listener;
254 }
255
256 pub fn set_consume_thread_min(&mut self, consume_thread_min: u32) {
257 self.consume_thread_min = consume_thread_min;
258 }
259
260 pub fn set_consume_thread_max(&mut self, consume_thread_max: u32) {
261 self.consume_thread_max = consume_thread_max;
262 }
263
264 pub fn set_adjust_thread_pool_nums_threshold(&mut self, adjust_thread_pool_nums_threshold: u64) {
265 self.adjust_thread_pool_nums_threshold = adjust_thread_pool_nums_threshold;
266 }
267
268 pub fn set_consume_concurrently_max_span(&mut self, consume_concurrently_max_span: u32) {
269 self.consume_concurrently_max_span = consume_concurrently_max_span;
270 }
271
272 pub fn set_pull_threshold_for_queue(&mut self, pull_threshold_for_queue: u32) {
273 self.pull_threshold_for_queue = pull_threshold_for_queue;
274 }
275
276 pub fn set_pop_threshold_for_queue(&mut self, pop_threshold_for_queue: u32) {
277 self.pop_threshold_for_queue = pop_threshold_for_queue;
278 }
279
280 pub fn set_pull_threshold_size_for_queue(&mut self, pull_threshold_size_for_queue: u32) {
281 self.pull_threshold_size_for_queue = pull_threshold_size_for_queue;
282 }
283
284 pub fn set_pull_threshold_for_topic(&mut self, pull_threshold_for_topic: i32) {
285 self.pull_threshold_for_topic = pull_threshold_for_topic;
286 }
287
288 pub fn set_pull_threshold_size_for_topic(&mut self, pull_threshold_size_for_topic: i32) {
289 self.pull_threshold_size_for_topic = pull_threshold_size_for_topic;
290 }
291
292 pub fn set_pull_interval(&mut self, pull_interval: u64) {
293 self.pull_interval = pull_interval;
294 }
295
296 pub fn set_consume_message_batch_max_size(&mut self, consume_message_batch_max_size: u32) {
297 self.consume_message_batch_max_size = consume_message_batch_max_size;
298 }
299
300 pub fn set_pull_batch_size(&mut self, pull_batch_size: u32) {
301 self.pull_batch_size = pull_batch_size;
302 }
303
304 pub fn set_pull_batch_size_in_bytes(&mut self, pull_batch_size_in_bytes: u32) {
305 self.pull_batch_size_in_bytes = pull_batch_size_in_bytes;
306 }
307
308 pub fn set_post_subscription_when_pull(&mut self, post_subscription_when_pull: bool) {
309 self.post_subscription_when_pull = post_subscription_when_pull;
310 }
311
312 pub fn set_unit_mode(&mut self, unit_mode: bool) {
313 self.unit_mode = unit_mode;
314 }
315
316 pub fn set_max_reconsume_times(&mut self, max_reconsume_times: i32) {
317 self.max_reconsume_times = max_reconsume_times;
318 }
319
320 pub fn set_suspend_current_queue_time_millis(&mut self, suspend_current_queue_time_millis: u64) {
321 self.suspend_current_queue_time_millis = suspend_current_queue_time_millis;
322 }
323
324 pub fn set_consume_timeout(&mut self, consume_timeout: u64) {
325 self.consume_timeout = consume_timeout;
326 }
327
328 pub fn set_pop_invisible_time(&mut self, pop_invisible_time: u64) {
329 self.pop_invisible_time = pop_invisible_time;
330 }
331
332 pub fn set_pop_batch_nums(&mut self, pop_batch_nums: u32) {
333 self.pop_batch_nums = pop_batch_nums;
334 }
335
336 pub fn set_await_termination_millis_when_shutdown(&mut self, await_termination_millis_when_shutdown: u64) {
337 self.await_termination_millis_when_shutdown = await_termination_millis_when_shutdown;
338 }
339
340 pub fn set_trace_dispatcher(&mut self, trace_dispatcher: Option<ArcTraceDispatcher>) {
341 self.trace_dispatcher = trace_dispatcher;
342 }
343
344 pub fn set_client_rebalance(&mut self, client_rebalance: bool) {
345 self.client_rebalance = client_rebalance;
346 }
347
348 pub fn set_rpc_hook(&mut self, rpc_hook: Option<Arc<dyn RPCHook>>) {
349 self.rpc_hook = rpc_hook;
350 }
351}
352
353impl Default for ConsumerConfig {
354 fn default() -> Self {
355 ConsumerConfig {
356 consumer_group: CheetahString::new(),
357 topic: CheetahString::new(),
358 sub_expression: CheetahString::new(),
359 message_model: MessageModel::Clustering,
360 consume_from_where: ConsumeFromWhere::ConsumeFromLastOffset,
361 consume_timestamp: Some(CheetahString::from_string(util_all::time_millis_to_human_string3(
362 (current_millis() - (1000 * 60 * 30)) as i64,
363 ))),
364 allocate_message_queue_strategy: Some(Arc::new(AllocateMessageQueueAveragely)),
365 subscription: ArcMut::new(HashMap::new()),
366 message_listener: None,
367 message_queue_listener: None,
368
369 consume_thread_min: 20,
370 consume_thread_max: 64,
371 adjust_thread_pool_nums_threshold: 100_000,
372 consume_concurrently_max_span: 2000,
373 pull_threshold_for_queue: 1000,
374 pop_threshold_for_queue: 96,
375 pull_threshold_size_for_queue: 100,
376 pull_threshold_for_topic: -1,
377 pull_threshold_size_for_topic: -1,
378 pull_interval: 0,
379 consume_message_batch_max_size: 1,
380 pull_batch_size: 32,
381 pull_batch_size_in_bytes: 256 * 1024,
382 post_subscription_when_pull: false,
383 unit_mode: false,
384 max_reconsume_times: -1,
385 suspend_current_queue_time_millis: 1000,
386 consume_timeout: 15,
387 pop_invisible_time: 60000,
388 pop_batch_nums: 32,
389 await_termination_millis_when_shutdown: 0,
390 trace_dispatcher: None,
391 client_rebalance: true,
392 rpc_hook: None,
393 }
394 }
395}
396
397pub struct DefaultMQPushConsumer {
398 client_config: ClientConfig,
399 consumer_config: ArcMut<ConsumerConfig>,
400 pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
401}
402
403impl MQConsumer for DefaultMQPushConsumer {
404 async fn send_message_back(
405 &mut self,
406 msg: MessageExt,
407 delay_level: i32,
408 broker_name: &str,
409 ) -> rocketmq_error::RocketMQResult<()> {
410 todo!()
411 }
412
413 async fn fetch_subscribe_message_queues(
414 &mut self,
415 topic: &str,
416 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
417 todo!()
418 }
419}
420
421impl MQPushConsumer for DefaultMQPushConsumer {
422 async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
423 let consumer_group = NamespaceUtil::wrap_namespace(
424 self.client_config.get_namespace().unwrap_or_default().as_str(),
425 self.consumer_config.consumer_group.as_str(),
426 );
427 self.set_consumer_group(consumer_group.as_str());
428 self.default_mqpush_consumer_impl.as_mut().unwrap().start().await?;
429
430 if self.client_config.enable_trace {
431 let dispatcher = AsyncTraceDispatcher::new(
432 self.consumer_config.consumer_group.as_str(),
433 Type::Consume,
434 20, self.client_config.trace_topic.clone().unwrap().as_str(),
436 None, );
438 dispatcher.set_host_consumer(self.default_mqpush_consumer_impl.as_ref().unwrap().clone());
439 dispatcher.set_namespace_v2(self.client_config.namespace_v2.clone());
440 let dispatcher: ArcTraceDispatcher = Arc::new(dispatcher);
441 self.consumer_config.trace_dispatcher = Some(dispatcher.clone());
442 let default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_mut().unwrap();
443 default_mqpush_consumer_impl
444 .register_consume_message_hook(ConsumeMessageTraceHookImpl::new(dispatcher.clone()));
445 }
446
447 if let Some(ref rpc_hook) = self.consumer_config.trace_dispatcher {
448 unimplemented!("trace hook");
449 }
450
451 Ok(())
452 }
453
454 async fn shutdown(&mut self) {
455 todo!()
456 }
457
458 fn register_message_listener_concurrently<ML>(&mut self, message_listener: ML)
459 where
460 ML: MessageListenerConcurrently + Send + Sync + 'static,
461 {
462 let message_listener = MessageListener {
463 message_listener_concurrently: Some(Arc::new(message_listener)),
464 message_listener_orderly: None,
465 };
466 self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
467 self.default_mqpush_consumer_impl
468 .as_mut()
469 .unwrap()
470 .register_message_listener(self.consumer_config.message_listener.clone());
471 }
472
473 fn register_message_listener_orderly<ML>(&mut self, message_listener: ML)
474 where
475 ML: MessageListenerOrderly + Send + Sync + 'static,
476 {
477 let message_listener = MessageListener {
478 message_listener_concurrently: None,
479 message_listener_orderly: Some(Arc::new(message_listener)),
480 };
481 self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
482 self.default_mqpush_consumer_impl
483 .as_mut()
484 .unwrap()
485 .register_message_listener(self.consumer_config.message_listener.clone());
486 }
487
488 async fn subscribe(
489 &mut self,
490 topic: impl Into<CheetahString>,
491 sub_expression: impl Into<CheetahString>,
492 ) -> rocketmq_error::RocketMQResult<()> {
493 let topic = topic.into();
494 let sub_expression = sub_expression.into();
495
496 self.default_mqpush_consumer_impl
497 .as_mut()
498 .expect("default_mqpush_consumer_impl is not initialized")
499 .subscribe(topic, sub_expression)
500 .await
501 }
502
503 async fn subscribe_with_selector(
504 &mut self,
505 topic: &str,
506 selector: Option<MessageSelector>,
507 ) -> rocketmq_error::RocketMQResult<()> {
508 todo!()
509 }
510
511 async fn unsubscribe(&mut self, topic: &str) {
512 if let Some(ref mut default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
513 default_mqpush_consumer_impl.unsubscribe(topic).await;
514 }
515 }
516
517 async fn suspend(&self) {
518 if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
519 default_mqpush_consumer_impl.suspend().await;
520 }
521 }
522
523 async fn resume(&self) {
524 if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
525 default_mqpush_consumer_impl.resume().await;
526 }
527 }
528}
529
530impl DefaultMQPushConsumer {
531 pub fn builder() -> DefaultMQPushConsumerBuilder {
532 DefaultMQPushConsumerBuilder::default()
533 }
534
535 #[inline]
536 pub fn set_consumer_group(&mut self, consumer_group: impl Into<CheetahString>) {
537 self.consumer_config.consumer_group = consumer_group.into();
538 }
539
540 pub fn new(client_config: ClientConfig, consumer_config: ConsumerConfig) -> DefaultMQPushConsumer {
541 let consumer_config = ArcMut::new(consumer_config);
542 let mut default_mqpush_consumer_impl = ArcMut::new(DefaultMQPushConsumerImpl::new(
543 client_config.clone(),
544 consumer_config.clone(),
545 consumer_config.rpc_hook.clone(),
546 ));
547 let wrapper = default_mqpush_consumer_impl.clone();
548 default_mqpush_consumer_impl.set_default_mqpush_consumer_impl(wrapper);
549 DefaultMQPushConsumer {
550 client_config,
551 consumer_config,
552 default_mqpush_consumer_impl: Some(default_mqpush_consumer_impl),
553 }
554 }
555
556 pub fn set_name_server_addr(&mut self, name_server_addr: CheetahString) {
557 self.client_config.namesrv_addr = Some(name_server_addr);
558 self.client_config
559 .namespace_initialized
560 .store(false, std::sync::atomic::Ordering::Release);
561 }
562
563 pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
564 self.consumer_config.consume_from_where = consume_from_where;
565 }
566}