Skip to main content

rocketmq_client_rust/consumer/
default_lite_pull_consumer.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::HashSet;
17use std::sync::Arc;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::message::message_ext::MessageExt;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22use rocketmq_common::common::topic::TopicValidator;
23use rocketmq_error::RocketMQResult;
24use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
25use rocketmq_remoting::runtime::RPCHook;
26use rocketmq_rust::ArcMut;
27use tokio::sync::OnceCell;
28use tokio::sync::RwLock;
29
30use crate::base::client_config::ClientConfig;
31use crate::consumer::consumer_impl::default_lite_pull_consumer_impl::DefaultLitePullConsumerImpl;
32use crate::consumer::consumer_impl::default_lite_pull_consumer_impl::LitePullConsumerConfig;
33use crate::consumer::default_lite_pull_consumer_builder::DefaultLitePullConsumerBuilder;
34use crate::consumer::lite_pull_consumer::LitePullConsumer;
35use crate::consumer::message_queue_listener::MessageQueueListener;
36use crate::consumer::message_selector::MessageSelector;
37use crate::consumer::mq_consumer::MQConsumer;
38use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
39use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
40use crate::trace::hook::consume_message_trace_hook_impl::ConsumeMessageTraceHookImpl;
41use crate::trace::trace_dispatcher::TraceDispatcher;
42use crate::trace::trace_dispatcher::Type;
43
44/// Default implementation of a lite pull consumer.
45///
46/// This is the main entry point for creating and using a lite pull consumer. It acts as a facade
47/// over the internal implementation ([`DefaultLitePullConsumerImpl`]) and provides:
48///
49/// - Configuration management via [`DefaultLitePullConsumerBuilder`]
50/// - Namespace handling (automatic topic name wrapping/unwrapping)
51/// - Optional message trace integration
52/// - A clean public API that delegates to the internal implementation
53///
54/// # Architecture
55///
56/// ```text
57/// ┌────────────────────────────────────┐
58/// │   DefaultLitePullConsumer          │  ← Public Facade
59/// │  (config, trace, namespace)        │
60/// └──────────────┬─────────────────────┘
61///                │ delegates
62///                ↓
63/// ┌────────────────────────────────────┐
64/// │ DefaultLitePullConsumerImpl        │  ← Core Logic
65/// │  (lifecycle, pull, commit, etc.)   │
66/// └────────────────────────────────────┘
67/// ```
68///
69/// # Examples
70///
71/// ## Basic usage with auto-commit
72///
73/// ```rust,ignore
74/// use rocketmq_client::consumer::default_lite_pull_consumer::DefaultLitePullConsumer;
75///
76/// let consumer = DefaultLitePullConsumer::builder()
77///     .consumer_group("my_consumer_group")
78///     .name_server_addr("127.0.0.1:9876")
79///     .auto_commit(true)
80///     .build();
81///
82/// consumer.start().await?;
83/// consumer.subscribe("my_topic").await?;
84///
85/// loop {
86///     let messages = consumer.poll_with_timeout(1000).await;
87///     for msg in messages {
88///         println!("Received: {:?}", msg);
89///     }
90///     // Offsets are auto-committed
91/// }
92/// ```
93///
94/// ## Manual offset control
95///
96/// ```rust,ignore
97/// let consumer = DefaultLitePullConsumer::builder()
98///     .consumer_group("my_consumer_group")
99///     .name_server_addr("127.0.0.1:9876")
100///     .auto_commit(false)  // Disable auto-commit
101///     .build();
102///
103/// consumer.start().await?;
104/// consumer.subscribe("my_topic").await?;
105///
106/// loop {
107///     let messages = consumer.poll_with_timeout(1000).await;
108///     for msg in messages {
109///         // Process message
110///     }
111///     
112///     // Manually commit after processing
113///     consumer.commit_all().await?;
114/// }
115/// ```
116///
117/// ## Manual queue assignment (no rebalance)
118///
119/// ```rust,ignore
120/// use rocketmq_common::common::message::message_queue::MessageQueue;
121///
122/// let mq1 = MessageQueue::from_parts("my_topic", "broker-a", 0);
123/// let mq2 = MessageQueue::from_parts("my_topic", "broker-a", 1);
124///
125/// consumer.start().await?;
126/// consumer.assign(vec![mq1, mq2]).await;
127///
128/// let messages = consumer.poll().await;
129/// ```
130#[derive(Clone)]
131pub struct DefaultLitePullConsumer {
132    /// Client configuration (network, instance name, etc.)
133    client_config: ArcMut<ClientConfig>,
134
135    /// Consumer-specific configuration (pull batch size, flow control, etc.)
136    consumer_config: ArcMut<LitePullConsumerConfig>,
137
138    /// Core implementation (lazy-initialized on start using OnceCell)
139    default_lite_pull_consumer_impl: Arc<OnceCell<ArcMut<DefaultLitePullConsumerImpl>>>,
140
141    /// Optional RPC hook for request/response interception
142    rpc_hook: Option<Arc<dyn RPCHook>>,
143
144    /// Optional trace dispatcher for message tracing (with interior mutability)
145    trace_dispatcher: Arc<RwLock<Option<Arc<dyn TraceDispatcher + Send + Sync>>>>,
146
147    /// Whether message trace is enabled
148    enable_msg_trace: bool,
149
150    /// Custom trace topic (if specified)
151    custom_trace_topic: Option<CheetahString>,
152}
153
154impl DefaultLitePullConsumer {
155    /// Creates a new consumer with the specified configuration.
156    ///
157    /// Most users should use [`builder()`](Self::builder) instead.
158    pub fn new(
159        client_config: ArcMut<ClientConfig>,
160        consumer_config: ArcMut<LitePullConsumerConfig>,
161        rpc_hook: Option<Arc<dyn RPCHook>>,
162        trace_dispatcher: Option<Arc<dyn TraceDispatcher + Send + Sync>>,
163        enable_msg_trace: bool,
164        custom_trace_topic: Option<CheetahString>,
165    ) -> Self {
166        Self {
167            client_config,
168            consumer_config,
169            default_lite_pull_consumer_impl: Arc::new(OnceCell::new()),
170            rpc_hook,
171            trace_dispatcher: Arc::new(RwLock::new(trace_dispatcher)),
172            enable_msg_trace,
173            custom_trace_topic,
174        }
175    }
176
177    /// Returns a builder for creating a consumer with custom configuration.
178    ///
179    /// # Examples
180    ///
181    /// ```rust,ignore
182    /// let consumer = DefaultLitePullConsumer::builder()
183    ///     .consumer_group("my_group")
184    ///     .name_server_addr("127.0.0.1:9876")
185    ///     .pull_batch_size(32)
186    ///     .build();
187    /// ```
188    pub fn builder() -> DefaultLitePullConsumerBuilder {
189        DefaultLitePullConsumerBuilder::new()
190    }
191
192    /// Returns the consumer group name.
193    pub fn consumer_group(&self) -> &CheetahString {
194        &self.consumer_config.consumer_group
195    }
196
197    /// Returns the namespace (if configured).
198    pub fn namespace(&self) -> Option<CheetahString> {
199        self.client_config.get_namespace_v2().cloned()
200    }
201
202    /// Returns the client configuration.
203    pub fn client_config(&self) -> &ArcMut<ClientConfig> {
204        &self.client_config
205    }
206
207    /// Returns the consumer configuration.
208    pub fn consumer_config(&self) -> &ArcMut<LitePullConsumerConfig> {
209        &self.consumer_config
210    }
211
212    /// Wraps a topic name with the namespace (if configured).
213    fn with_namespace(&self, resource: &str) -> CheetahString {
214        match self.client_config.get_namespace_v2() {
215            Some(namespace) => NamespaceUtil::wrap_namespace(namespace.as_str(), resource),
216            None => CheetahString::from_string(resource.to_string()),
217        }
218    }
219
220    /// Removes namespace from a topic name (if configured).
221    fn without_namespace(&self, resource: &str) -> CheetahString {
222        match self.client_config.get_namespace_v2() {
223            Some(namespace) => NamespaceUtil::without_namespace_with_namespace(resource, namespace.as_str()).into(),
224            None => CheetahString::from_string(resource.to_string()),
225        }
226    }
227
228    /// Wraps a message queue with namespace.
229    fn queue_with_namespace(&self, mq: &MessageQueue) -> MessageQueue {
230        match self.client_config.get_namespace_v2() {
231            Some(namespace) => {
232                let wrapped_topic = NamespaceUtil::wrap_namespace(namespace.as_str(), mq.topic().clone());
233                MessageQueue::from_parts(wrapped_topic, mq.broker_name().clone(), mq.queue_id())
234            }
235            None => mq.clone(),
236        }
237    }
238
239    /// Removes namespace from a message queue.
240    fn queue_without_namespace(&self, mq: &MessageQueue) -> MessageQueue {
241        match self.client_config.get_namespace_v2() {
242            Some(namespace) => {
243                let unwrapped_topic = NamespaceUtil::without_namespace_with_namespace(mq.topic(), namespace.as_str());
244                MessageQueue::from_parts(unwrapped_topic, mq.broker_name().clone(), mq.queue_id())
245            }
246            None => mq.clone(),
247        }
248    }
249
250    /// Initializes the trace dispatcher if message trace is enabled.
251    async fn init_trace_dispatcher_internal(&self, impl_: &ArcMut<DefaultLitePullConsumerImpl>) -> RocketMQResult<()> {
252        if !self.enable_msg_trace {
253            return Ok(());
254        }
255
256        let mut dispatcher_guard = self.trace_dispatcher.write().await;
257        if dispatcher_guard.is_some() {
258            return Ok(());
259        }
260
261        // Create default async trace dispatcher
262        let trace_topic = self
263            .custom_trace_topic
264            .as_ref()
265            .map(|t| t.as_str())
266            .unwrap_or(TopicValidator::RMQ_SYS_TRACE_TOPIC);
267
268        let dispatcher = AsyncTraceDispatcher::new(
269            self.consumer_config.consumer_group.as_str(),
270            Type::Consume,
271            20, // batch_num
272            trace_topic,
273            None, // rpc_hook - convert if needed
274        );
275
276        let dispatcher_arc: Arc<dyn TraceDispatcher + Send + Sync> = Arc::new(dispatcher);
277        *dispatcher_guard = Some(dispatcher_arc.clone());
278
279        // Register trace hook to impl
280        let hook = Arc::new(ConsumeMessageTraceHookImpl::new(dispatcher_arc));
281        impl_.mut_from_ref().register_consume_message_hook(hook);
282
283        Ok(())
284    }
285
286    /// Returns a reference to the internal implementation.
287    ///
288    /// # Panics
289    ///
290    /// Panics if called before `start()`.
291    fn impl_(&self) -> &ArcMut<DefaultLitePullConsumerImpl> {
292        self.default_lite_pull_consumer_impl
293            .get()
294            .expect("Consumer not started. Call start() first.")
295    }
296}
297
298#[allow(unused)]
299impl LitePullConsumer for DefaultLitePullConsumer {
300    async fn start(&self) -> RocketMQResult<()> {
301        // Initialize impl using OnceCell::get_or_try_init for thread-safe lazy initialization
302        let impl_ = self
303            .default_lite_pull_consumer_impl
304            .get_or_try_init(|| async {
305                let impl_ = ArcMut::new(DefaultLitePullConsumerImpl::new(
306                    self.client_config.clone(),
307                    self.consumer_config.clone(),
308                ));
309
310                // Initialize trace if enabled
311                self.init_trace_dispatcher_internal(&impl_).await?;
312
313                Ok::<ArcMut<DefaultLitePullConsumerImpl>, rocketmq_error::RocketMQError>(impl_)
314            })
315            .await?;
316
317        // Start the impl
318        impl_.mut_from_ref().start().await
319    }
320
321    async fn shutdown(&self) {
322        if let Some(impl_) = self.default_lite_pull_consumer_impl.get() {
323            let _ = impl_.mut_from_ref().shutdown().await;
324        }
325
326        // Shutdown trace dispatcher
327        if let Some(dispatcher) = self.trace_dispatcher.write().await.take() {
328            dispatcher.shutdown();
329        }
330    }
331
332    async fn is_running(&self) -> bool {
333        match self.default_lite_pull_consumer_impl.get() {
334            Some(impl_) => impl_.is_running().await,
335            None => false,
336        }
337    }
338
339    async fn subscribe(&self, topic: &str) -> RocketMQResult<()> {
340        self.subscribe_with_expression(topic, "*").await
341    }
342
343    async fn subscribe_with_expression(&self, topic: &str, sub_expression: &str) -> RocketMQResult<()> {
344        let wrapped_topic = self.with_namespace(topic);
345        self.impl_()
346            .mut_from_ref()
347            .subscribe(wrapped_topic, sub_expression)
348            .await
349    }
350
351    async fn subscribe_with_listener<MQL>(&self, topic: &str, sub_expression: &str, listener: MQL) -> RocketMQResult<()>
352    where
353        MQL: MessageQueueListener + 'static,
354    {
355        let wrapped_topic = self.with_namespace(topic);
356        self.impl_()
357            .mut_from_ref()
358            .subscribe_with_listener(wrapped_topic, sub_expression, listener)
359            .await
360    }
361
362    async fn subscribe_with_selector(&self, topic: &str, selector: Option<MessageSelector>) -> RocketMQResult<()> {
363        let wrapped_topic = self.with_namespace(topic);
364        self.impl_()
365            .mut_from_ref()
366            .subscribe_with_selector(wrapped_topic, selector)
367            .await
368    }
369
370    async fn unsubscribe(&self, topic: &str) {
371        let wrapped_topic = self.with_namespace(topic);
372        if let Err(e) = self.impl_().mut_from_ref().unsubscribe(wrapped_topic).await {
373            tracing::warn!("Failed to unsubscribe from topic {}: {}", topic, e);
374        }
375    }
376
377    async fn assignment(&self) -> RocketMQResult<HashSet<MessageQueue>> {
378        let assignment = self.impl_().assignment().await;
379
380        // Remove namespace from all queues
381        let mut result = HashSet::with_capacity(assignment.len());
382        for mq in assignment {
383            result.insert(self.queue_without_namespace(&mq));
384        }
385
386        Ok(result)
387    }
388
389    async fn assign(&self, message_queues: Vec<MessageQueue>) {
390        // Wrap namespace for all queues
391        let wrapped_queues: Vec<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
392
393        self.impl_().mut_from_ref().assign(wrapped_queues).await;
394    }
395
396    async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str) {
397        let wrapped_topic = self.with_namespace(topic);
398        self.impl_()
399            .set_sub_expression_for_assign(wrapped_topic, sub_expression)
400            .await;
401    }
402
403    async fn build_subscriptions_for_heartbeat(
404        &self,
405        sub_expression_map: &mut HashMap<String, MessageSelector>,
406    ) -> RocketMQResult<()> {
407        self.impl_().build_subscriptions_for_heartbeat(sub_expression_map).await
408    }
409
410    /// Zero-copy implementation.
411    async fn poll_zero_copy(&self) -> Vec<ArcMut<MessageExt>> {
412        match self.impl_().poll(self.consumer_config.poll_timeout_millis).await {
413            Ok(messages) => messages,
414            Err(e) => {
415                tracing::error!("Poll failed: {}", e);
416                Vec::new()
417            }
418        }
419    }
420
421    /// Zero-copy implementation with custom timeout.
422    async fn poll_with_timeout_zero_copy(&self, timeout: u64) -> Vec<ArcMut<MessageExt>> {
423        match self.impl_().poll_with_timeout(timeout).await {
424            Ok(messages) => messages,
425            Err(e) => {
426                tracing::error!("Poll failed: {}", e);
427                Vec::new()
428            }
429        }
430    }
431
432    /// Delegates to zero-copy implementation and creates owned message copies.
433    async fn poll(&self) -> Vec<MessageExt> {
434        self.poll_zero_copy()
435            .await
436            .into_iter()
437            .map(|arc_mut| (*arc_mut).clone())
438            .collect()
439    }
440
441    /// Delegates to zero-copy implementation with timeout and creates owned message copies.
442    async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt> {
443        self.poll_with_timeout_zero_copy(timeout)
444            .await
445            .into_iter()
446            .map(|arc_mut| (*arc_mut).clone())
447            .collect()
448    }
449
450    async fn fetch_message_queues(&self, topic: &str) -> RocketMQResult<Vec<MessageQueue>> {
451        let wrapped_topic = self.with_namespace(topic);
452        let queues = self.impl_().fetch_message_queues(wrapped_topic).await?;
453
454        // Remove namespace from all queues and convert to Vec
455        let result: Vec<_> = queues.into_iter().map(|mq| self.queue_without_namespace(&mq)).collect();
456
457        Ok(result)
458    }
459
460    async fn committed(&self, message_queue: &MessageQueue) -> RocketMQResult<i64> {
461        let wrapped_mq = self.queue_with_namespace(message_queue);
462        self.impl_().committed(&wrapped_mq).await
463    }
464
465    async fn commit_all(&self) -> RocketMQResult<()> {
466        self.impl_().commit_all().await
467    }
468
469    async fn commit_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool) {
470        // Wrap namespace for all queues
471        let mut wrapped_offsets = HashMap::with_capacity(offset_map.len());
472        for (mq, offset) in offset_map {
473            wrapped_offsets.insert(self.queue_with_namespace(&mq), offset);
474        }
475
476        if let Err(e) = self.impl_().commit(wrapped_offsets, persist).await {
477            tracing::warn!("commit_with_map failed: {}", e);
478        }
479    }
480
481    async fn commit(&self) {
482        if let Err(e) = self.commit_all().await {
483            tracing::warn!("commit failed: {}", e);
484        }
485    }
486
487    async fn commit_with_set(&self, message_queues: HashSet<MessageQueue>, persist: bool) {
488        // Wrap namespace for all queues
489        let wrapped_queues: HashSet<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
490
491        // Get current offsets for specified queues
492        let mut offset_map = HashMap::new();
493        for mq in &wrapped_queues {
494            if let Ok(offset) = self.impl_().committed(mq).await {
495                offset_map.insert(mq.clone(), offset);
496            }
497        }
498
499        if let Err(e) = self.impl_().commit(offset_map, persist).await {
500            tracing::warn!("commit_with_set failed: {}", e);
501        }
502    }
503
504    async fn commit_sync(&self) {
505        // Commit all offsets (trait note: misleading name, doesn't actually block)
506        if let Err(e) = self.commit_all().await {
507            tracing::warn!("commit_sync failed: {}", e);
508        }
509    }
510
511    async fn commit_sync_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool) {
512        // Commit specific offsets (trait note: misleading name, doesn't actually block)
513        self.commit_with_map(offset_map, persist).await;
514    }
515
516    async fn is_auto_commit(&self) -> bool {
517        self.consumer_config.auto_commit
518    }
519
520    async fn set_auto_commit(&self, auto_commit: bool) {
521        self.consumer_config.mut_from_ref().auto_commit = auto_commit;
522    }
523
524    async fn offset_for_timestamp(&self, message_queue: &MessageQueue, timestamp: u64) -> RocketMQResult<i64> {
525        let wrapped_mq = self.queue_with_namespace(message_queue);
526        self.impl_().offset_for_timestamp(&wrapped_mq, timestamp).await
527    }
528
529    async fn pause(&self, message_queues: Vec<MessageQueue>) {
530        let wrapped_queues: Vec<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
531
532        self.impl_().pause(&wrapped_queues).await;
533    }
534
535    async fn resume(&self, message_queues: Vec<MessageQueue>) {
536        let wrapped_queues: Vec<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
537
538        self.impl_().resume(&wrapped_queues).await;
539    }
540
541    async fn is_paused(&self, message_queue: &MessageQueue) -> bool {
542        let wrapped_mq = self.queue_with_namespace(message_queue);
543        self.impl_().is_paused(&wrapped_mq).await
544    }
545
546    async fn seek(&self, message_queue: &MessageQueue, offset: i64) -> RocketMQResult<()> {
547        let wrapped_mq = self.queue_with_namespace(message_queue);
548        self.impl_().seek(&wrapped_mq, offset).await
549    }
550
551    async fn seek_to_begin(&self, message_queue: &MessageQueue) -> RocketMQResult<()> {
552        let wrapped_mq = self.queue_with_namespace(message_queue);
553        self.impl_().seek_to_begin(&wrapped_mq).await
554    }
555
556    async fn seek_to_end(&self, message_queue: &MessageQueue) -> RocketMQResult<()> {
557        let wrapped_mq = self.queue_with_namespace(message_queue);
558        self.impl_().seek_to_end(&wrapped_mq).await
559    }
560
561    async fn update_name_server_address(&self, name_server_address: &str) {
562        self.client_config
563            .mut_from_ref()
564            .set_namesrv_addr(CheetahString::from_string(name_server_address.to_string()));
565
566        if let Some(impl_) = self.default_lite_pull_consumer_impl.get() {
567            let addresses: Vec<String> = name_server_address
568                .split(';')
569                .map(|s| s.trim().to_string())
570                .filter(|s| !s.is_empty())
571                .collect();
572            impl_.update_name_server_address(addresses).await;
573        }
574    }
575
576    async fn register_topic_message_queue_change_listener<TL>(&self, topic: &str, listener: TL) -> RocketMQResult<()>
577    where
578        TL: TopicMessageQueueChangeListener + 'static,
579    {
580        let wrapped_topic = self.with_namespace(topic);
581        let listener_arc: Arc<dyn TopicMessageQueueChangeListener + Send + Sync> = Arc::new(listener);
582        self.impl_()
583            .register_topic_message_queue_change_listener(wrapped_topic, listener_arc)
584            .await
585    }
586}
587
588impl MQConsumer for DefaultLitePullConsumer {
589    async fn send_message_back(&mut self, msg: MessageExt, delay_level: i32, broker_name: &str) -> RocketMQResult<()> {
590        // Lite pull consumer doesn't support send message back
591        // This is typically used in push consumer for retry
592        Err(crate::mq_client_err!(
593            -1,
594            "sendMessageBack is not supported in lite pull consumer"
595        ))
596    }
597
598    async fn fetch_subscribe_message_queues(&mut self, topic: &str) -> RocketMQResult<Vec<MessageQueue>> {
599        let queues = self.fetch_message_queues(topic).await?;
600        Ok(queues.into_iter().collect())
601    }
602}