Skip to main content

rocketmq_client_rust/consumer/
default_mq_push_consumer.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use cheetah_string::CheetahString;
19use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
20use rocketmq_common::common::message::message_ext::MessageExt;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22use rocketmq_common::utils::util_all;
23use rocketmq_common::TimeUtils::current_millis;
24use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
25use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
26use rocketmq_remoting::runtime::RPCHook;
27use rocketmq_rust::ArcMut;
28
29use crate::base::client_config::ClientConfig;
30use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
31use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
32use crate::consumer::default_mq_push_consumer_builder::DefaultMQPushConsumerBuilder;
33use crate::consumer::listener::message_listener::MessageListener;
34use crate::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
35use crate::consumer::listener::message_listener_orderly::MessageListenerOrderly;
36use crate::consumer::message_queue_listener::ArcMessageQueueListener;
37use crate::consumer::message_selector::MessageSelector;
38use crate::consumer::mq_consumer::MQConsumer;
39use crate::consumer::mq_push_consumer::MQPushConsumer;
40use crate::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
41use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
42use crate::trace::hook::consume_message_trace_hook_impl::ConsumeMessageTraceHookImpl;
43use crate::trace::trace_dispatcher::ArcTraceDispatcher;
44use crate::trace::trace_dispatcher::Type;
45
46#[derive(Clone)]
47pub struct ConsumerConfig {
48    pub(crate) consumer_group: CheetahString,
49    pub(crate) topic: CheetahString,
50    pub(crate) sub_expression: CheetahString,
51    pub(crate) message_model: MessageModel,
52    pub(crate) consume_from_where: ConsumeFromWhere,
53    pub(crate) consume_timestamp: Option<CheetahString>,
54    pub(crate) allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
55    //this field will be removed in a certain version after April 5, 2020
56    pub(crate) subscription: ArcMut<HashMap<CheetahString, CheetahString>>,
57    pub(crate) message_listener: Option<ArcMut<MessageListener>>,
58    pub(crate) message_queue_listener: Option<ArcMessageQueueListener>,
59    pub(crate) consume_thread_min: u32,
60    pub(crate) consume_thread_max: u32,
61    pub(crate) adjust_thread_pool_nums_threshold: u64,
62    pub(crate) consume_concurrently_max_span: u32,
63    pub(crate) pull_threshold_for_queue: u32,
64    pub(crate) pop_threshold_for_queue: u32,
65    pub(crate) pull_threshold_size_for_queue: u32,
66    pub(crate) pull_threshold_for_topic: i32,
67    pub(crate) pull_threshold_size_for_topic: i32,
68    pub(crate) pull_interval: u64,
69    pub(crate) consume_message_batch_max_size: u32,
70    pub(crate) pull_batch_size: u32,
71    pub(crate) pull_batch_size_in_bytes: u32,
72    pub(crate) post_subscription_when_pull: bool,
73    pub(crate) unit_mode: bool,
74    pub(crate) max_reconsume_times: i32,
75    pub(crate) suspend_current_queue_time_millis: u64,
76    pub(crate) consume_timeout: u64,
77    pub(crate) pop_invisible_time: u64,
78    pub(crate) pop_batch_nums: u32,
79    pub(crate) await_termination_millis_when_shutdown: u64,
80    pub(crate) trace_dispatcher: Option<ArcTraceDispatcher>,
81    pub(crate) client_rebalance: bool,
82    pub(crate) rpc_hook: Option<Arc<dyn RPCHook>>,
83}
84
85impl ConsumerConfig {
86    pub fn consumer_group(&self) -> &CheetahString {
87        &self.consumer_group
88    }
89
90    pub fn message_model(&self) -> MessageModel {
91        self.message_model
92    }
93
94    pub fn consume_from_where(&self) -> ConsumeFromWhere {
95        self.consume_from_where
96    }
97
98    pub fn consume_timestamp(&self) -> &Option<CheetahString> {
99        &self.consume_timestamp
100    }
101
102    pub fn allocate_message_queue_strategy(&self) -> Option<Arc<dyn AllocateMessageQueueStrategy>> {
103        self.allocate_message_queue_strategy.clone()
104    }
105
106    pub fn subscription(&self) -> &ArcMut<HashMap<CheetahString, CheetahString>> {
107        &self.subscription
108    }
109
110    /*    pub fn message_listener(&self) -> &Option<ArcMut<MessageListener>> {
111        &self.message_listener
112    }*/
113
114    /*    pub fn message_queue_listener(&self) -> &Option<ArcMessageQueueListener> {
115        &self.message_queue_listener
116    }*/
117
118    pub fn consume_thread_min(&self) -> u32 {
119        self.consume_thread_min
120    }
121
122    pub fn consume_thread_max(&self) -> u32 {
123        self.consume_thread_max
124    }
125
126    pub fn adjust_thread_pool_nums_threshold(&self) -> u64 {
127        self.adjust_thread_pool_nums_threshold
128    }
129
130    pub fn consume_concurrently_max_span(&self) -> u32 {
131        self.consume_concurrently_max_span
132    }
133
134    pub fn pull_threshold_for_queue(&self) -> u32 {
135        self.pull_threshold_for_queue
136    }
137
138    pub fn pop_threshold_for_queue(&self) -> u32 {
139        self.pop_threshold_for_queue
140    }
141
142    pub fn pull_threshold_size_for_queue(&self) -> u32 {
143        self.pull_threshold_size_for_queue
144    }
145
146    pub fn pull_threshold_for_topic(&self) -> i32 {
147        self.pull_threshold_for_topic
148    }
149
150    pub fn pull_threshold_size_for_topic(&self) -> i32 {
151        self.pull_threshold_size_for_topic
152    }
153
154    pub fn pull_interval(&self) -> u64 {
155        self.pull_interval
156    }
157
158    pub fn consume_message_batch_max_size(&self) -> u32 {
159        self.consume_message_batch_max_size
160    }
161
162    pub fn pull_batch_size(&self) -> u32 {
163        self.pull_batch_size
164    }
165
166    pub fn pull_batch_size_in_bytes(&self) -> u32 {
167        self.pull_batch_size_in_bytes
168    }
169
170    pub fn post_subscription_when_pull(&self) -> bool {
171        self.post_subscription_when_pull
172    }
173
174    pub fn unit_mode(&self) -> bool {
175        self.unit_mode
176    }
177
178    pub fn max_reconsume_times(&self) -> i32 {
179        self.max_reconsume_times
180    }
181
182    pub fn suspend_current_queue_time_millis(&self) -> u64 {
183        self.suspend_current_queue_time_millis
184    }
185
186    pub fn consume_timeout(&self) -> u64 {
187        self.consume_timeout
188    }
189
190    pub fn pop_invisible_time(&self) -> u64 {
191        self.pop_invisible_time
192    }
193
194    pub fn pop_batch_nums(&self) -> u32 {
195        self.pop_batch_nums
196    }
197
198    pub fn await_termination_millis_when_shutdown(&self) -> u64 {
199        self.await_termination_millis_when_shutdown
200    }
201
202    pub fn trace_dispatcher(&self) -> &Option<ArcTraceDispatcher> {
203        &self.trace_dispatcher
204    }
205
206    pub fn client_rebalance(&self) -> bool {
207        self.client_rebalance
208    }
209
210    pub fn rpc_hook(&self) -> &Option<Arc<dyn RPCHook>> {
211        &self.rpc_hook
212    }
213
214    pub fn set_consumer_group(&mut self, consumer_group: CheetahString) {
215        self.consumer_group = consumer_group;
216    }
217
218    pub fn set_message_model(&mut self, message_model: MessageModel) {
219        self.message_model = message_model;
220    }
221
222    pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
223        self.consume_from_where = consume_from_where;
224    }
225
226    pub fn set_consume_timestamp(&mut self, consume_timestamp: Option<CheetahString>) {
227        self.consume_timestamp = consume_timestamp;
228    }
229
230    pub fn set_allocate_message_queue_strategy(
231        &mut self,
232        allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy>,
233    ) {
234        self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
235    }
236
237    /**
238     * This method will be removed in a certain version after April 5, 2020, so please do not
239     * use this method.
240     */
241    pub fn set_subscription(&mut self, subscription: ArcMut<HashMap<CheetahString, CheetahString>>) {
242        self.subscription = subscription;
243    }
244
245    /*    pub fn set_message_listener(
246        &mut self,
247        message_listener: Option<ArcMut<MessageListener>>,
248    ) {
249        self.message_listener = message_listener;
250    }*/
251
252    pub fn set_message_queue_listener(&mut self, message_queue_listener: Option<ArcMessageQueueListener>) {
253        self.message_queue_listener = message_queue_listener;
254    }
255
256    pub fn set_consume_thread_min(&mut self, consume_thread_min: u32) {
257        self.consume_thread_min = consume_thread_min;
258    }
259
260    pub fn set_consume_thread_max(&mut self, consume_thread_max: u32) {
261        self.consume_thread_max = consume_thread_max;
262    }
263
264    pub fn set_adjust_thread_pool_nums_threshold(&mut self, adjust_thread_pool_nums_threshold: u64) {
265        self.adjust_thread_pool_nums_threshold = adjust_thread_pool_nums_threshold;
266    }
267
268    pub fn set_consume_concurrently_max_span(&mut self, consume_concurrently_max_span: u32) {
269        self.consume_concurrently_max_span = consume_concurrently_max_span;
270    }
271
272    pub fn set_pull_threshold_for_queue(&mut self, pull_threshold_for_queue: u32) {
273        self.pull_threshold_for_queue = pull_threshold_for_queue;
274    }
275
276    pub fn set_pop_threshold_for_queue(&mut self, pop_threshold_for_queue: u32) {
277        self.pop_threshold_for_queue = pop_threshold_for_queue;
278    }
279
280    pub fn set_pull_threshold_size_for_queue(&mut self, pull_threshold_size_for_queue: u32) {
281        self.pull_threshold_size_for_queue = pull_threshold_size_for_queue;
282    }
283
284    pub fn set_pull_threshold_for_topic(&mut self, pull_threshold_for_topic: i32) {
285        self.pull_threshold_for_topic = pull_threshold_for_topic;
286    }
287
288    pub fn set_pull_threshold_size_for_topic(&mut self, pull_threshold_size_for_topic: i32) {
289        self.pull_threshold_size_for_topic = pull_threshold_size_for_topic;
290    }
291
292    pub fn set_pull_interval(&mut self, pull_interval: u64) {
293        self.pull_interval = pull_interval;
294    }
295
296    pub fn set_consume_message_batch_max_size(&mut self, consume_message_batch_max_size: u32) {
297        self.consume_message_batch_max_size = consume_message_batch_max_size;
298    }
299
300    pub fn set_pull_batch_size(&mut self, pull_batch_size: u32) {
301        self.pull_batch_size = pull_batch_size;
302    }
303
304    pub fn set_pull_batch_size_in_bytes(&mut self, pull_batch_size_in_bytes: u32) {
305        self.pull_batch_size_in_bytes = pull_batch_size_in_bytes;
306    }
307
308    pub fn set_post_subscription_when_pull(&mut self, post_subscription_when_pull: bool) {
309        self.post_subscription_when_pull = post_subscription_when_pull;
310    }
311
312    pub fn set_unit_mode(&mut self, unit_mode: bool) {
313        self.unit_mode = unit_mode;
314    }
315
316    pub fn set_max_reconsume_times(&mut self, max_reconsume_times: i32) {
317        self.max_reconsume_times = max_reconsume_times;
318    }
319
320    pub fn set_suspend_current_queue_time_millis(&mut self, suspend_current_queue_time_millis: u64) {
321        self.suspend_current_queue_time_millis = suspend_current_queue_time_millis;
322    }
323
324    pub fn set_consume_timeout(&mut self, consume_timeout: u64) {
325        self.consume_timeout = consume_timeout;
326    }
327
328    pub fn set_pop_invisible_time(&mut self, pop_invisible_time: u64) {
329        self.pop_invisible_time = pop_invisible_time;
330    }
331
332    pub fn set_pop_batch_nums(&mut self, pop_batch_nums: u32) {
333        self.pop_batch_nums = pop_batch_nums;
334    }
335
336    pub fn set_await_termination_millis_when_shutdown(&mut self, await_termination_millis_when_shutdown: u64) {
337        self.await_termination_millis_when_shutdown = await_termination_millis_when_shutdown;
338    }
339
340    pub fn set_trace_dispatcher(&mut self, trace_dispatcher: Option<ArcTraceDispatcher>) {
341        self.trace_dispatcher = trace_dispatcher;
342    }
343
344    pub fn set_client_rebalance(&mut self, client_rebalance: bool) {
345        self.client_rebalance = client_rebalance;
346    }
347
348    pub fn set_rpc_hook(&mut self, rpc_hook: Option<Arc<dyn RPCHook>>) {
349        self.rpc_hook = rpc_hook;
350    }
351}
352
353impl Default for ConsumerConfig {
354    fn default() -> Self {
355        ConsumerConfig {
356            consumer_group: CheetahString::new(),
357            topic: CheetahString::new(),
358            sub_expression: CheetahString::new(),
359            message_model: MessageModel::Clustering,
360            consume_from_where: ConsumeFromWhere::ConsumeFromLastOffset,
361            consume_timestamp: Some(CheetahString::from_string(util_all::time_millis_to_human_string3(
362                (current_millis() - (1000 * 60 * 30)) as i64,
363            ))),
364            allocate_message_queue_strategy: Some(Arc::new(AllocateMessageQueueAveragely)),
365            subscription: ArcMut::new(HashMap::new()),
366            message_listener: None,
367            message_queue_listener: None,
368
369            consume_thread_min: 20,
370            consume_thread_max: 64,
371            adjust_thread_pool_nums_threshold: 100_000,
372            consume_concurrently_max_span: 2000,
373            pull_threshold_for_queue: 1000,
374            pop_threshold_for_queue: 96,
375            pull_threshold_size_for_queue: 100,
376            pull_threshold_for_topic: -1,
377            pull_threshold_size_for_topic: -1,
378            pull_interval: 0,
379            consume_message_batch_max_size: 1,
380            pull_batch_size: 32,
381            pull_batch_size_in_bytes: 256 * 1024,
382            post_subscription_when_pull: false,
383            unit_mode: false,
384            max_reconsume_times: -1,
385            suspend_current_queue_time_millis: 1000,
386            consume_timeout: 15,
387            pop_invisible_time: 60000,
388            pop_batch_nums: 32,
389            await_termination_millis_when_shutdown: 0,
390            trace_dispatcher: None,
391            client_rebalance: true,
392            rpc_hook: None,
393        }
394    }
395}
396
397pub struct DefaultMQPushConsumer {
398    client_config: ClientConfig,
399    consumer_config: ArcMut<ConsumerConfig>,
400    pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
401}
402
403impl MQConsumer for DefaultMQPushConsumer {
404    async fn send_message_back(
405        &mut self,
406        msg: MessageExt,
407        delay_level: i32,
408        broker_name: &str,
409    ) -> rocketmq_error::RocketMQResult<()> {
410        todo!()
411    }
412
413    async fn fetch_subscribe_message_queues(
414        &mut self,
415        topic: &str,
416    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
417        todo!()
418    }
419}
420
421impl MQPushConsumer for DefaultMQPushConsumer {
422    async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
423        let consumer_group = NamespaceUtil::wrap_namespace(
424            self.client_config.get_namespace().unwrap_or_default().as_str(),
425            self.consumer_config.consumer_group.as_str(),
426        );
427        self.set_consumer_group(consumer_group.as_str());
428        self.default_mqpush_consumer_impl.as_mut().unwrap().start().await?;
429
430        if self.client_config.enable_trace {
431            let dispatcher = AsyncTraceDispatcher::new(
432                self.consumer_config.consumer_group.as_str(),
433                Type::Consume,
434                20, // batch_num
435                self.client_config.trace_topic.clone().unwrap().as_str(),
436                None, // rpc_hook - convert if needed
437            );
438            dispatcher.set_host_consumer(self.default_mqpush_consumer_impl.as_ref().unwrap().clone());
439            dispatcher.set_namespace_v2(self.client_config.namespace_v2.clone());
440            let dispatcher: ArcTraceDispatcher = Arc::new(dispatcher);
441            self.consumer_config.trace_dispatcher = Some(dispatcher.clone());
442            let default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_mut().unwrap();
443            default_mqpush_consumer_impl
444                .register_consume_message_hook(ConsumeMessageTraceHookImpl::new(dispatcher.clone()));
445        }
446
447        if let Some(ref rpc_hook) = self.consumer_config.trace_dispatcher {
448            unimplemented!("trace hook");
449        }
450
451        Ok(())
452    }
453
454    async fn shutdown(&mut self) {
455        todo!()
456    }
457
458    fn register_message_listener_concurrently<ML>(&mut self, message_listener: ML)
459    where
460        ML: MessageListenerConcurrently + Send + Sync + 'static,
461    {
462        let message_listener = MessageListener {
463            message_listener_concurrently: Some(Arc::new(message_listener)),
464            message_listener_orderly: None,
465        };
466        self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
467        self.default_mqpush_consumer_impl
468            .as_mut()
469            .unwrap()
470            .register_message_listener(self.consumer_config.message_listener.clone());
471    }
472
473    fn register_message_listener_orderly<ML>(&mut self, message_listener: ML)
474    where
475        ML: MessageListenerOrderly + Send + Sync + 'static,
476    {
477        let message_listener = MessageListener {
478            message_listener_concurrently: None,
479            message_listener_orderly: Some(Arc::new(message_listener)),
480        };
481        self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
482        self.default_mqpush_consumer_impl
483            .as_mut()
484            .unwrap()
485            .register_message_listener(self.consumer_config.message_listener.clone());
486    }
487
488    async fn subscribe(
489        &mut self,
490        topic: impl Into<CheetahString>,
491        sub_expression: impl Into<CheetahString>,
492    ) -> rocketmq_error::RocketMQResult<()> {
493        let topic = topic.into();
494        let sub_expression = sub_expression.into();
495
496        self.default_mqpush_consumer_impl
497            .as_mut()
498            .expect("default_mqpush_consumer_impl is not initialized")
499            .subscribe(topic, sub_expression)
500            .await
501    }
502
503    async fn subscribe_with_selector(
504        &mut self,
505        topic: &str,
506        selector: Option<MessageSelector>,
507    ) -> rocketmq_error::RocketMQResult<()> {
508        todo!()
509    }
510
511    async fn unsubscribe(&mut self, topic: &str) {
512        if let Some(ref mut default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
513            default_mqpush_consumer_impl.unsubscribe(topic).await;
514        }
515    }
516
517    async fn suspend(&self) {
518        if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
519            default_mqpush_consumer_impl.suspend().await;
520        }
521    }
522
523    async fn resume(&self) {
524        if let Some(ref default_mqpush_consumer_impl) = self.default_mqpush_consumer_impl {
525            default_mqpush_consumer_impl.resume().await;
526        }
527    }
528}
529
530impl DefaultMQPushConsumer {
531    pub fn builder() -> DefaultMQPushConsumerBuilder {
532        DefaultMQPushConsumerBuilder::default()
533    }
534
535    #[inline]
536    pub fn set_consumer_group(&mut self, consumer_group: impl Into<CheetahString>) {
537        self.consumer_config.consumer_group = consumer_group.into();
538    }
539
540    pub fn new(client_config: ClientConfig, consumer_config: ConsumerConfig) -> DefaultMQPushConsumer {
541        let consumer_config = ArcMut::new(consumer_config);
542        let mut default_mqpush_consumer_impl = ArcMut::new(DefaultMQPushConsumerImpl::new(
543            client_config.clone(),
544            consumer_config.clone(),
545            consumer_config.rpc_hook.clone(),
546        ));
547        let wrapper = default_mqpush_consumer_impl.clone();
548        default_mqpush_consumer_impl.set_default_mqpush_consumer_impl(wrapper);
549        DefaultMQPushConsumer {
550            client_config,
551            consumer_config,
552            default_mqpush_consumer_impl: Some(default_mqpush_consumer_impl),
553        }
554    }
555
556    pub fn set_name_server_addr(&mut self, name_server_addr: CheetahString) {
557        self.client_config.namesrv_addr = Some(name_server_addr);
558        self.client_config
559            .namespace_initialized
560            .store(false, std::sync::atomic::Ordering::Release);
561    }
562
563    pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
564        self.consumer_config.consume_from_where = consume_from_where;
565    }
566}