rocketmq_client_rust/consumer/
default_mq_push_consumer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::thread;
21
22use cheetah_string::CheetahString;
23use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
24use rocketmq_common::common::message::message_ext::MessageExt;
25use rocketmq_common::common::message::message_queue::MessageQueue;
26use rocketmq_common::utils::util_all;
27use rocketmq_common::TimeUtils::get_current_millis;
28use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
29use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
30use rocketmq_remoting::runtime::RPCHook;
31use rocketmq_rust::ArcMut;
32use tokio::runtime::Handle;
33
34use crate::base::client_config::ClientConfig;
35use crate::base::mq_admin::MQAdmin;
36use crate::base::query_result::QueryResult;
37use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
38use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
39use crate::consumer::default_mq_push_consumer_builder::DefaultMQPushConsumerBuilder;
40use crate::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
41use crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
42use crate::consumer::listener::consume_orderly_context::ConsumeOrderlyContext;
43use crate::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus;
44use crate::consumer::listener::message_listener::MessageListener;
45use crate::consumer::listener::message_listener_concurrently::MessageListenerConcurrently;
46use crate::consumer::listener::message_listener_orderly::MessageListenerOrderly;
47use crate::consumer::message_queue_listener::MessageQueueListener;
48use crate::consumer::message_selector::MessageSelector;
49use crate::consumer::mq_consumer::MQConsumer;
50use crate::consumer::mq_push_consumer::MQPushConsumer;
51use crate::consumer::rebalance_strategy::allocate_message_queue_averagely::AllocateMessageQueueAveragely;
52use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
53use crate::trace::hook::consume_message_trace_hook_impl::ConsumeMessageTraceHookImpl;
54use crate::trace::trace_dispatcher::TraceDispatcher;
55use crate::trace::trace_dispatcher::Type;
56
57#[derive(Clone)]
58pub struct ConsumerConfig {
59    pub(crate) consumer_group: CheetahString,
60    pub(crate) topic: CheetahString,
61    pub(crate) sub_expression: CheetahString,
62    pub(crate) message_model: MessageModel,
63    pub(crate) consume_from_where: ConsumeFromWhere,
64    pub(crate) consume_timestamp: Option<CheetahString>,
65    pub(crate) allocate_message_queue_strategy: Option<Arc<dyn AllocateMessageQueueStrategy>>,
66    //this field will be removed in a certain version after April 5, 2020
67    pub(crate) subscription: ArcMut<HashMap<CheetahString, CheetahString>>,
68    pub(crate) message_listener: Option<ArcMut<MessageListener>>,
69    pub(crate) message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
70    pub(crate) consume_thread_min: u32,
71    pub(crate) consume_thread_max: u32,
72    pub(crate) adjust_thread_pool_nums_threshold: u64,
73    pub(crate) consume_concurrently_max_span: u32,
74    pub(crate) pull_threshold_for_queue: u32,
75    pub(crate) pop_threshold_for_queue: u32,
76    pub(crate) pull_threshold_size_for_queue: u32,
77    pub(crate) pull_threshold_for_topic: i32,
78    pub(crate) pull_threshold_size_for_topic: i32,
79    pub(crate) pull_interval: u64,
80    pub(crate) consume_message_batch_max_size: u32,
81    pub(crate) pull_batch_size: u32,
82    pub(crate) pull_batch_size_in_bytes: u32,
83    pub(crate) post_subscription_when_pull: bool,
84    pub(crate) unit_mode: bool,
85    pub(crate) max_reconsume_times: i32,
86    pub(crate) suspend_current_queue_time_millis: u64,
87    pub(crate) consume_timeout: u64,
88    pub(crate) pop_invisible_time: u64,
89    pub(crate) pop_batch_nums: u32,
90    pub(crate) await_termination_millis_when_shutdown: u64,
91    pub(crate) trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
92    pub(crate) client_rebalance: bool,
93    pub(crate) rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
94}
95
96impl ConsumerConfig {
97    pub fn consumer_group(&self) -> &CheetahString {
98        &self.consumer_group
99    }
100
101    pub fn message_model(&self) -> MessageModel {
102        self.message_model
103    }
104
105    pub fn consume_from_where(&self) -> ConsumeFromWhere {
106        self.consume_from_where
107    }
108
109    pub fn consume_timestamp(&self) -> &Option<CheetahString> {
110        &self.consume_timestamp
111    }
112
113    pub fn allocate_message_queue_strategy(&self) -> Option<Arc<dyn AllocateMessageQueueStrategy>> {
114        self.allocate_message_queue_strategy.clone()
115    }
116
117    pub fn subscription(&self) -> &ArcMut<HashMap<CheetahString, CheetahString>> {
118        &self.subscription
119    }
120
121    /*    pub fn message_listener(&self) -> &Option<ArcMut<MessageListener>> {
122        &self.message_listener
123    }*/
124
125    /*    pub fn message_queue_listener(&self) -> &Option<Arc<Box<dyn MessageQueueListener>>> {
126        &self.message_queue_listener
127    }*/
128
129    pub fn consume_thread_min(&self) -> u32 {
130        self.consume_thread_min
131    }
132
133    pub fn consume_thread_max(&self) -> u32 {
134        self.consume_thread_max
135    }
136
137    pub fn adjust_thread_pool_nums_threshold(&self) -> u64 {
138        self.adjust_thread_pool_nums_threshold
139    }
140
141    pub fn consume_concurrently_max_span(&self) -> u32 {
142        self.consume_concurrently_max_span
143    }
144
145    pub fn pull_threshold_for_queue(&self) -> u32 {
146        self.pull_threshold_for_queue
147    }
148
149    pub fn pop_threshold_for_queue(&self) -> u32 {
150        self.pop_threshold_for_queue
151    }
152
153    pub fn pull_threshold_size_for_queue(&self) -> u32 {
154        self.pull_threshold_size_for_queue
155    }
156
157    pub fn pull_threshold_for_topic(&self) -> i32 {
158        self.pull_threshold_for_topic
159    }
160
161    pub fn pull_threshold_size_for_topic(&self) -> i32 {
162        self.pull_threshold_size_for_topic
163    }
164
165    pub fn pull_interval(&self) -> u64 {
166        self.pull_interval
167    }
168
169    pub fn consume_message_batch_max_size(&self) -> u32 {
170        self.consume_message_batch_max_size
171    }
172
173    pub fn pull_batch_size(&self) -> u32 {
174        self.pull_batch_size
175    }
176
177    pub fn pull_batch_size_in_bytes(&self) -> u32 {
178        self.pull_batch_size_in_bytes
179    }
180
181    pub fn post_subscription_when_pull(&self) -> bool {
182        self.post_subscription_when_pull
183    }
184
185    pub fn unit_mode(&self) -> bool {
186        self.unit_mode
187    }
188
189    pub fn max_reconsume_times(&self) -> i32 {
190        self.max_reconsume_times
191    }
192
193    pub fn suspend_current_queue_time_millis(&self) -> u64 {
194        self.suspend_current_queue_time_millis
195    }
196
197    pub fn consume_timeout(&self) -> u64 {
198        self.consume_timeout
199    }
200
201    pub fn pop_invisible_time(&self) -> u64 {
202        self.pop_invisible_time
203    }
204
205    pub fn pop_batch_nums(&self) -> u32 {
206        self.pop_batch_nums
207    }
208
209    pub fn await_termination_millis_when_shutdown(&self) -> u64 {
210        self.await_termination_millis_when_shutdown
211    }
212
213    pub fn trace_dispatcher(&self) -> &Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>> {
214        &self.trace_dispatcher
215    }
216
217    pub fn client_rebalance(&self) -> bool {
218        self.client_rebalance
219    }
220
221    pub fn rpc_hook(&self) -> &Option<Arc<Box<dyn RPCHook>>> {
222        &self.rpc_hook
223    }
224
225    pub fn set_consumer_group(&mut self, consumer_group: CheetahString) {
226        self.consumer_group = consumer_group;
227    }
228
229    pub fn set_message_model(&mut self, message_model: MessageModel) {
230        self.message_model = message_model;
231    }
232
233    pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
234        self.consume_from_where = consume_from_where;
235    }
236
237    pub fn set_consume_timestamp(&mut self, consume_timestamp: Option<CheetahString>) {
238        self.consume_timestamp = consume_timestamp;
239    }
240
241    pub fn set_allocate_message_queue_strategy(
242        &mut self,
243        allocate_message_queue_strategy: Arc<dyn AllocateMessageQueueStrategy>,
244    ) {
245        self.allocate_message_queue_strategy = Some(allocate_message_queue_strategy);
246    }
247
248    /**
249     * This method will be removed in a certain version after April 5, 2020, so please do not
250     * use this method.
251     */
252    pub fn set_subscription(
253        &mut self,
254        subscription: ArcMut<HashMap<CheetahString, CheetahString>>,
255    ) {
256        self.subscription = subscription;
257    }
258
259    /*    pub fn set_message_listener(
260        &mut self,
261        message_listener: Option<ArcMut<MessageListener>>,
262    ) {
263        self.message_listener = message_listener;
264    }*/
265
266    pub fn set_message_queue_listener(
267        &mut self,
268        message_queue_listener: Option<Arc<Box<dyn MessageQueueListener>>>,
269    ) {
270        self.message_queue_listener = message_queue_listener;
271    }
272
273    pub fn set_consume_thread_min(&mut self, consume_thread_min: u32) {
274        self.consume_thread_min = consume_thread_min;
275    }
276
277    pub fn set_consume_thread_max(&mut self, consume_thread_max: u32) {
278        self.consume_thread_max = consume_thread_max;
279    }
280
281    pub fn set_adjust_thread_pool_nums_threshold(
282        &mut self,
283        adjust_thread_pool_nums_threshold: u64,
284    ) {
285        self.adjust_thread_pool_nums_threshold = adjust_thread_pool_nums_threshold;
286    }
287
288    pub fn set_consume_concurrently_max_span(&mut self, consume_concurrently_max_span: u32) {
289        self.consume_concurrently_max_span = consume_concurrently_max_span;
290    }
291
292    pub fn set_pull_threshold_for_queue(&mut self, pull_threshold_for_queue: u32) {
293        self.pull_threshold_for_queue = pull_threshold_for_queue;
294    }
295
296    pub fn set_pop_threshold_for_queue(&mut self, pop_threshold_for_queue: u32) {
297        self.pop_threshold_for_queue = pop_threshold_for_queue;
298    }
299
300    pub fn set_pull_threshold_size_for_queue(&mut self, pull_threshold_size_for_queue: u32) {
301        self.pull_threshold_size_for_queue = pull_threshold_size_for_queue;
302    }
303
304    pub fn set_pull_threshold_for_topic(&mut self, pull_threshold_for_topic: i32) {
305        self.pull_threshold_for_topic = pull_threshold_for_topic;
306    }
307
308    pub fn set_pull_threshold_size_for_topic(&mut self, pull_threshold_size_for_topic: i32) {
309        self.pull_threshold_size_for_topic = pull_threshold_size_for_topic;
310    }
311
312    pub fn set_pull_interval(&mut self, pull_interval: u64) {
313        self.pull_interval = pull_interval;
314    }
315
316    pub fn set_consume_message_batch_max_size(&mut self, consume_message_batch_max_size: u32) {
317        self.consume_message_batch_max_size = consume_message_batch_max_size;
318    }
319
320    pub fn set_pull_batch_size(&mut self, pull_batch_size: u32) {
321        self.pull_batch_size = pull_batch_size;
322    }
323
324    pub fn set_pull_batch_size_in_bytes(&mut self, pull_batch_size_in_bytes: u32) {
325        self.pull_batch_size_in_bytes = pull_batch_size_in_bytes;
326    }
327
328    pub fn set_post_subscription_when_pull(&mut self, post_subscription_when_pull: bool) {
329        self.post_subscription_when_pull = post_subscription_when_pull;
330    }
331
332    pub fn set_unit_mode(&mut self, unit_mode: bool) {
333        self.unit_mode = unit_mode;
334    }
335
336    pub fn set_max_reconsume_times(&mut self, max_reconsume_times: i32) {
337        self.max_reconsume_times = max_reconsume_times;
338    }
339
340    pub fn set_suspend_current_queue_time_millis(
341        &mut self,
342        suspend_current_queue_time_millis: u64,
343    ) {
344        self.suspend_current_queue_time_millis = suspend_current_queue_time_millis;
345    }
346
347    pub fn set_consume_timeout(&mut self, consume_timeout: u64) {
348        self.consume_timeout = consume_timeout;
349    }
350
351    pub fn set_pop_invisible_time(&mut self, pop_invisible_time: u64) {
352        self.pop_invisible_time = pop_invisible_time;
353    }
354
355    pub fn set_pop_batch_nums(&mut self, pop_batch_nums: u32) {
356        self.pop_batch_nums = pop_batch_nums;
357    }
358
359    pub fn set_await_termination_millis_when_shutdown(
360        &mut self,
361        await_termination_millis_when_shutdown: u64,
362    ) {
363        self.await_termination_millis_when_shutdown = await_termination_millis_when_shutdown;
364    }
365
366    pub fn set_trace_dispatcher(
367        &mut self,
368        trace_dispatcher: Option<Arc<Box<dyn TraceDispatcher + Send + Sync>>>,
369    ) {
370        self.trace_dispatcher = trace_dispatcher;
371    }
372
373    pub fn set_client_rebalance(&mut self, client_rebalance: bool) {
374        self.client_rebalance = client_rebalance;
375    }
376
377    pub fn set_rpc_hook(&mut self, rpc_hook: Option<Arc<Box<dyn RPCHook>>>) {
378        self.rpc_hook = rpc_hook;
379    }
380}
381
382impl Default for ConsumerConfig {
383    fn default() -> Self {
384        ConsumerConfig {
385            consumer_group: CheetahString::new(),
386            topic: CheetahString::new(),
387            sub_expression: CheetahString::new(),
388            message_model: MessageModel::Clustering,
389            consume_from_where: ConsumeFromWhere::ConsumeFromLastOffset,
390            consume_timestamp: Some(CheetahString::from_string(
391                util_all::time_millis_to_human_string3(
392                    (get_current_millis() - (1000 * 60 * 30)) as i64,
393                ),
394            )),
395            allocate_message_queue_strategy: Some(Arc::new(AllocateMessageQueueAveragely)),
396            subscription: ArcMut::new(HashMap::new()),
397            message_listener: None,
398            message_queue_listener: None,
399
400            consume_thread_min: 20,
401            consume_thread_max: 64,
402            adjust_thread_pool_nums_threshold: 100_000,
403            consume_concurrently_max_span: 2000,
404            pull_threshold_for_queue: 1000,
405            pop_threshold_for_queue: 96,
406            pull_threshold_size_for_queue: 100,
407            pull_threshold_for_topic: -1,
408            pull_threshold_size_for_topic: -1,
409            pull_interval: 0,
410            consume_message_batch_max_size: 1,
411            pull_batch_size: 32,
412            pull_batch_size_in_bytes: 256 * 1024,
413            post_subscription_when_pull: false,
414            unit_mode: false,
415            max_reconsume_times: -1,
416            suspend_current_queue_time_millis: 1000,
417            consume_timeout: 15,
418            pop_invisible_time: 60000,
419            pop_batch_nums: 32,
420            await_termination_millis_when_shutdown: 0,
421            trace_dispatcher: None,
422            client_rebalance: true,
423            rpc_hook: None,
424        }
425    }
426}
427
428pub struct DefaultMQPushConsumer {
429    client_config: ClientConfig,
430    consumer_config: ArcMut<ConsumerConfig>,
431    pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
432}
433
434impl MQConsumer for DefaultMQPushConsumer {
435    async fn send_message_back(
436        &mut self,
437        msg: MessageExt,
438        delay_level: i32,
439        broker_name: &str,
440    ) -> rocketmq_error::RocketMQResult<()> {
441        todo!()
442    }
443
444    async fn fetch_subscribe_message_queues(
445        &mut self,
446        topic: &str,
447    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
448        todo!()
449    }
450}
451
452impl MQAdmin for DefaultMQPushConsumer {
453    fn create_topic(
454        &self,
455        key: &str,
456        new_topic: &str,
457        queue_num: i32,
458        attributes: HashMap<String, String>,
459    ) -> rocketmq_error::RocketMQResult<()> {
460        panic!("This method is not implemented for DefaultMQPushConsumer");
461    }
462
463    fn create_topic_with_flag(
464        &self,
465        key: &str,
466        new_topic: &str,
467        queue_num: i32,
468        topic_sys_flag: i32,
469        attributes: HashMap<String, String>,
470    ) -> rocketmq_error::RocketMQResult<()> {
471        panic!("This method is not implemented for DefaultMQPushConsumer");
472    }
473
474    fn search_offset(
475        &self,
476        mq: &MessageQueue,
477        timestamp: u64,
478    ) -> rocketmq_error::RocketMQResult<i64> {
479        panic!("This method is not implemented for DefaultMQPushConsumer");
480    }
481
482    fn max_offset(&self, mq: &MessageQueue) -> rocketmq_error::RocketMQResult<i64> {
483        panic!("This method is not implemented for DefaultMQPushConsumer");
484    }
485
486    fn min_offset(&self, mq: &MessageQueue) -> rocketmq_error::RocketMQResult<i64> {
487        panic!("This method is not implemented for DefaultMQPushConsumer");
488    }
489
490    fn earliest_msg_store_time(&self, mq: &MessageQueue) -> rocketmq_error::RocketMQResult<u64> {
491        panic!("This method is not implemented for DefaultMQPushConsumer");
492    }
493
494    fn query_message(
495        &self,
496        topic: &str,
497        key: &str,
498        max_num: i32,
499        begin: u64,
500        end: u64,
501    ) -> rocketmq_error::RocketMQResult<QueryResult> {
502        panic!("This method is not implemented for DefaultMQPushConsumer");
503    }
504
505    fn view_message(
506        &self,
507        topic: &str,
508        msg_id: &str,
509    ) -> rocketmq_error::RocketMQResult<MessageExt> {
510        panic!("This method is not implemented for DefaultMQPushConsumer");
511    }
512}
513
514impl MQPushConsumer for DefaultMQPushConsumer {
515    async fn start(&mut self) -> rocketmq_error::RocketMQResult<()> {
516        let consumer_group = NamespaceUtil::wrap_namespace(
517            self.client_config
518                .get_namespace()
519                .unwrap_or_default()
520                .as_str(),
521            self.consumer_config.consumer_group.as_str(),
522        );
523        self.set_consumer_group(consumer_group.as_str());
524        self.default_mqpush_consumer_impl
525            .as_mut()
526            .unwrap()
527            .start()
528            .await?;
529
530        if self.client_config.enable_trace {
531            let mut dispatcher = AsyncTraceDispatcher::new(
532                self.consumer_config.consumer_group.as_str(),
533                Type::Consume,
534                self.client_config.trace_topic.clone().unwrap().as_str(),
535                self.consumer_config.rpc_hook.clone(),
536            );
537            dispatcher
538                .set_host_consumer(self.default_mqpush_consumer_impl.as_ref().unwrap().clone());
539            dispatcher.set_namespace_v2(self.client_config.namespace_v2.clone());
540            let dispatcher: Arc<Box<dyn TraceDispatcher + Send + Sync>> =
541                Arc::new(Box::new(dispatcher));
542            self.consumer_config.trace_dispatcher = Some(dispatcher.clone());
543            let default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_mut().unwrap();
544            default_mqpush_consumer_impl.register_consume_message_hook(
545                ConsumeMessageTraceHookImpl::new(dispatcher.clone()),
546            );
547        }
548
549        if let Some(ref rpc_hook) = self.consumer_config.trace_dispatcher {
550            unimplemented!("trace hook");
551        }
552
553        Ok(())
554    }
555
556    async fn shutdown(&mut self) {
557        todo!()
558    }
559
560    fn register_message_listener_concurrently_fn<MLCFN>(&mut self, message_listener: MLCFN)
561    where
562        MLCFN: Fn(
563                Vec<MessageExt>,
564                ConsumeConcurrentlyContext,
565            ) -> rocketmq_error::RocketMQResult<ConsumeConcurrentlyStatus>
566            + Send
567            + Sync,
568    {
569        todo!()
570    }
571
572    fn register_message_listener_concurrently<ML>(&mut self, message_listener: ML)
573    where
574        ML: MessageListenerConcurrently + Send + Sync + 'static,
575    {
576        let message_listener = MessageListener {
577            message_listener_concurrently: Some((Some(Arc::new(Box::new(message_listener))), None)),
578            message_listener_orderly: None,
579        };
580        self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
581        self.default_mqpush_consumer_impl
582            .as_mut()
583            .unwrap()
584            .register_message_listener(self.consumer_config.message_listener.clone());
585    }
586
587    async fn register_message_listener_orderly_fn<MLOFN>(&mut self, message_listener: MLOFN)
588    where
589        MLOFN: Fn(
590                Vec<MessageExt>,
591                ConsumeOrderlyContext,
592            ) -> rocketmq_error::RocketMQResult<ConsumeOrderlyStatus>
593            + Send
594            + Sync,
595    {
596        todo!()
597    }
598
599    fn register_message_listener_orderly<ML>(&mut self, message_listener: ML)
600    where
601        ML: MessageListenerOrderly + Send + Sync + 'static,
602    {
603        let message_listener = MessageListener {
604            message_listener_concurrently: None,
605            message_listener_orderly: Some((Some(Arc::new(Box::new(message_listener))), None)),
606        };
607        self.consumer_config.message_listener = Some(ArcMut::new(message_listener));
608        self.default_mqpush_consumer_impl
609            .as_mut()
610            .unwrap()
611            .register_message_listener(self.consumer_config.message_listener.clone());
612    }
613
614    fn subscribe(
615        &mut self,
616        topic: &str,
617        sub_expression: &str,
618    ) -> rocketmq_error::RocketMQResult<()> {
619        let handle = Handle::current();
620        let mut default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.clone();
621        let topic = topic.to_string();
622        let sub_expression = sub_expression.to_string();
623        match thread::spawn(move || {
624            handle.block_on(async move {
625                default_mqpush_consumer_impl
626                    .as_mut()
627                    .unwrap()
628                    .subscribe(topic.into(), sub_expression.into())
629                    .await
630            })
631        })
632        .join()
633        {
634            Ok(value) => value,
635            Err(er) => {
636                panic!("Error: {er:?}");
637            }
638        }
639    }
640
641    async fn subscribe_with_selector(
642        &mut self,
643        topic: &str,
644        selector: Option<MessageSelector>,
645    ) -> rocketmq_error::RocketMQResult<()> {
646        todo!()
647    }
648
649    async fn unsubscribe(&mut self, topic: &str) {
650        todo!()
651    }
652
653    async fn suspend(&mut self) {
654        todo!()
655    }
656
657    async fn resume(&mut self) {
658        todo!()
659    }
660}
661
662impl DefaultMQPushConsumer {
663    pub fn builder() -> DefaultMQPushConsumerBuilder {
664        DefaultMQPushConsumerBuilder::default()
665    }
666
667    #[inline]
668    pub fn set_consumer_group(&mut self, consumer_group: impl Into<CheetahString>) {
669        self.consumer_config.consumer_group = consumer_group.into();
670    }
671
672    pub fn new(
673        client_config: ClientConfig,
674        consumer_config: ConsumerConfig,
675    ) -> DefaultMQPushConsumer {
676        let consumer_config = ArcMut::new(consumer_config);
677        let mut default_mqpush_consumer_impl = ArcMut::new(DefaultMQPushConsumerImpl::new(
678            client_config.clone(),
679            consumer_config.clone(),
680            consumer_config.rpc_hook.clone(),
681        ));
682        let wrapper = default_mqpush_consumer_impl.clone();
683        default_mqpush_consumer_impl.set_default_mqpush_consumer_impl(wrapper);
684        DefaultMQPushConsumer {
685            client_config,
686            consumer_config,
687            default_mqpush_consumer_impl: Some(default_mqpush_consumer_impl),
688        }
689    }
690
691    pub fn set_name_server_addr(&mut self, name_server_addr: CheetahString) {
692        self.client_config.namesrv_addr = Some(name_server_addr);
693        self.client_config
694            .namespace_initialized
695            .store(false, std::sync::atomic::Ordering::Release);
696    }
697
698    pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
699        self.consumer_config.consume_from_where = consume_from_where;
700    }
701}