rocketmq_client_rust/consumer/
default_lite_pull_consumer.rs1use std::collections::HashMap;
16use std::collections::HashSet;
17use std::sync::Arc;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::message::message_ext::MessageExt;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22use rocketmq_common::common::topic::TopicValidator;
23use rocketmq_error::RocketMQResult;
24use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
25use rocketmq_remoting::runtime::RPCHook;
26use rocketmq_rust::ArcMut;
27use tokio::sync::OnceCell;
28use tokio::sync::RwLock;
29
30use crate::base::client_config::ClientConfig;
31use crate::consumer::consumer_impl::default_lite_pull_consumer_impl::DefaultLitePullConsumerImpl;
32use crate::consumer::consumer_impl::default_lite_pull_consumer_impl::LitePullConsumerConfig;
33use crate::consumer::default_lite_pull_consumer_builder::DefaultLitePullConsumerBuilder;
34use crate::consumer::lite_pull_consumer::LitePullConsumer;
35use crate::consumer::message_queue_listener::MessageQueueListener;
36use crate::consumer::message_selector::MessageSelector;
37use crate::consumer::mq_consumer::MQConsumer;
38use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
39use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
40use crate::trace::hook::consume_message_trace_hook_impl::ConsumeMessageTraceHookImpl;
41use crate::trace::trace_dispatcher::TraceDispatcher;
42use crate::trace::trace_dispatcher::Type;
43
44#[derive(Clone)]
131pub struct DefaultLitePullConsumer {
132 client_config: ArcMut<ClientConfig>,
134
135 consumer_config: ArcMut<LitePullConsumerConfig>,
137
138 default_lite_pull_consumer_impl: Arc<OnceCell<ArcMut<DefaultLitePullConsumerImpl>>>,
140
141 rpc_hook: Option<Arc<dyn RPCHook>>,
143
144 trace_dispatcher: Arc<RwLock<Option<Arc<dyn TraceDispatcher + Send + Sync>>>>,
146
147 enable_msg_trace: bool,
149
150 custom_trace_topic: Option<CheetahString>,
152}
153
154impl DefaultLitePullConsumer {
155 pub fn new(
159 client_config: ArcMut<ClientConfig>,
160 consumer_config: ArcMut<LitePullConsumerConfig>,
161 rpc_hook: Option<Arc<dyn RPCHook>>,
162 trace_dispatcher: Option<Arc<dyn TraceDispatcher + Send + Sync>>,
163 enable_msg_trace: bool,
164 custom_trace_topic: Option<CheetahString>,
165 ) -> Self {
166 Self {
167 client_config,
168 consumer_config,
169 default_lite_pull_consumer_impl: Arc::new(OnceCell::new()),
170 rpc_hook,
171 trace_dispatcher: Arc::new(RwLock::new(trace_dispatcher)),
172 enable_msg_trace,
173 custom_trace_topic,
174 }
175 }
176
177 pub fn builder() -> DefaultLitePullConsumerBuilder {
189 DefaultLitePullConsumerBuilder::new()
190 }
191
192 pub fn consumer_group(&self) -> &CheetahString {
194 &self.consumer_config.consumer_group
195 }
196
197 pub fn namespace(&self) -> Option<CheetahString> {
199 self.client_config.get_namespace_v2().cloned()
200 }
201
202 pub fn client_config(&self) -> &ArcMut<ClientConfig> {
204 &self.client_config
205 }
206
207 pub fn consumer_config(&self) -> &ArcMut<LitePullConsumerConfig> {
209 &self.consumer_config
210 }
211
212 fn with_namespace(&self, resource: &str) -> CheetahString {
214 match self.client_config.get_namespace_v2() {
215 Some(namespace) => NamespaceUtil::wrap_namespace(namespace.as_str(), resource),
216 None => CheetahString::from_string(resource.to_string()),
217 }
218 }
219
220 fn without_namespace(&self, resource: &str) -> CheetahString {
222 match self.client_config.get_namespace_v2() {
223 Some(namespace) => NamespaceUtil::without_namespace_with_namespace(resource, namespace.as_str()).into(),
224 None => CheetahString::from_string(resource.to_string()),
225 }
226 }
227
228 fn queue_with_namespace(&self, mq: &MessageQueue) -> MessageQueue {
230 match self.client_config.get_namespace_v2() {
231 Some(namespace) => {
232 let wrapped_topic = NamespaceUtil::wrap_namespace(namespace.as_str(), mq.topic().clone());
233 MessageQueue::from_parts(wrapped_topic, mq.broker_name().clone(), mq.queue_id())
234 }
235 None => mq.clone(),
236 }
237 }
238
239 fn queue_without_namespace(&self, mq: &MessageQueue) -> MessageQueue {
241 match self.client_config.get_namespace_v2() {
242 Some(namespace) => {
243 let unwrapped_topic = NamespaceUtil::without_namespace_with_namespace(mq.topic(), namespace.as_str());
244 MessageQueue::from_parts(unwrapped_topic, mq.broker_name().clone(), mq.queue_id())
245 }
246 None => mq.clone(),
247 }
248 }
249
250 async fn init_trace_dispatcher_internal(&self, impl_: &ArcMut<DefaultLitePullConsumerImpl>) -> RocketMQResult<()> {
252 if !self.enable_msg_trace {
253 return Ok(());
254 }
255
256 let mut dispatcher_guard = self.trace_dispatcher.write().await;
257 if dispatcher_guard.is_some() {
258 return Ok(());
259 }
260
261 let trace_topic = self
263 .custom_trace_topic
264 .as_ref()
265 .map(|t| t.as_str())
266 .unwrap_or(TopicValidator::RMQ_SYS_TRACE_TOPIC);
267
268 let dispatcher = AsyncTraceDispatcher::new(
269 self.consumer_config.consumer_group.as_str(),
270 Type::Consume,
271 20, trace_topic,
273 None, );
275
276 let dispatcher_arc: Arc<dyn TraceDispatcher + Send + Sync> = Arc::new(dispatcher);
277 *dispatcher_guard = Some(dispatcher_arc.clone());
278
279 let hook = Arc::new(ConsumeMessageTraceHookImpl::new(dispatcher_arc));
281 impl_.mut_from_ref().register_consume_message_hook(hook);
282
283 Ok(())
284 }
285
286 fn impl_(&self) -> &ArcMut<DefaultLitePullConsumerImpl> {
292 self.default_lite_pull_consumer_impl
293 .get()
294 .expect("Consumer not started. Call start() first.")
295 }
296}
297
298#[allow(unused)]
299impl LitePullConsumer for DefaultLitePullConsumer {
300 async fn start(&self) -> RocketMQResult<()> {
301 let impl_ = self
303 .default_lite_pull_consumer_impl
304 .get_or_try_init(|| async {
305 let impl_ = ArcMut::new(DefaultLitePullConsumerImpl::new(
306 self.client_config.clone(),
307 self.consumer_config.clone(),
308 ));
309
310 self.init_trace_dispatcher_internal(&impl_).await?;
312
313 Ok::<ArcMut<DefaultLitePullConsumerImpl>, rocketmq_error::RocketMQError>(impl_)
314 })
315 .await?;
316
317 impl_.mut_from_ref().start().await
319 }
320
321 async fn shutdown(&self) {
322 if let Some(impl_) = self.default_lite_pull_consumer_impl.get() {
323 let _ = impl_.mut_from_ref().shutdown().await;
324 }
325
326 if let Some(dispatcher) = self.trace_dispatcher.write().await.take() {
328 dispatcher.shutdown();
329 }
330 }
331
332 async fn is_running(&self) -> bool {
333 match self.default_lite_pull_consumer_impl.get() {
334 Some(impl_) => impl_.is_running().await,
335 None => false,
336 }
337 }
338
339 async fn subscribe(&self, topic: &str) -> RocketMQResult<()> {
340 self.subscribe_with_expression(topic, "*").await
341 }
342
343 async fn subscribe_with_expression(&self, topic: &str, sub_expression: &str) -> RocketMQResult<()> {
344 let wrapped_topic = self.with_namespace(topic);
345 self.impl_()
346 .mut_from_ref()
347 .subscribe(wrapped_topic, sub_expression)
348 .await
349 }
350
351 async fn subscribe_with_listener<MQL>(&self, topic: &str, sub_expression: &str, listener: MQL) -> RocketMQResult<()>
352 where
353 MQL: MessageQueueListener + 'static,
354 {
355 let wrapped_topic = self.with_namespace(topic);
356 self.impl_()
357 .mut_from_ref()
358 .subscribe_with_listener(wrapped_topic, sub_expression, listener)
359 .await
360 }
361
362 async fn subscribe_with_selector(&self, topic: &str, selector: Option<MessageSelector>) -> RocketMQResult<()> {
363 let wrapped_topic = self.with_namespace(topic);
364 self.impl_()
365 .mut_from_ref()
366 .subscribe_with_selector(wrapped_topic, selector)
367 .await
368 }
369
370 async fn unsubscribe(&self, topic: &str) {
371 let wrapped_topic = self.with_namespace(topic);
372 if let Err(e) = self.impl_().mut_from_ref().unsubscribe(wrapped_topic).await {
373 tracing::warn!("Failed to unsubscribe from topic {}: {}", topic, e);
374 }
375 }
376
377 async fn assignment(&self) -> RocketMQResult<HashSet<MessageQueue>> {
378 let assignment = self.impl_().assignment().await;
379
380 let mut result = HashSet::with_capacity(assignment.len());
382 for mq in assignment {
383 result.insert(self.queue_without_namespace(&mq));
384 }
385
386 Ok(result)
387 }
388
389 async fn assign(&self, message_queues: Vec<MessageQueue>) {
390 let wrapped_queues: Vec<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
392
393 self.impl_().mut_from_ref().assign(wrapped_queues).await;
394 }
395
396 async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str) {
397 let wrapped_topic = self.with_namespace(topic);
398 self.impl_()
399 .set_sub_expression_for_assign(wrapped_topic, sub_expression)
400 .await;
401 }
402
403 async fn build_subscriptions_for_heartbeat(
404 &self,
405 sub_expression_map: &mut HashMap<String, MessageSelector>,
406 ) -> RocketMQResult<()> {
407 self.impl_().build_subscriptions_for_heartbeat(sub_expression_map).await
408 }
409
410 async fn poll_zero_copy(&self) -> Vec<ArcMut<MessageExt>> {
412 match self.impl_().poll(self.consumer_config.poll_timeout_millis).await {
413 Ok(messages) => messages,
414 Err(e) => {
415 tracing::error!("Poll failed: {}", e);
416 Vec::new()
417 }
418 }
419 }
420
421 async fn poll_with_timeout_zero_copy(&self, timeout: u64) -> Vec<ArcMut<MessageExt>> {
423 match self.impl_().poll_with_timeout(timeout).await {
424 Ok(messages) => messages,
425 Err(e) => {
426 tracing::error!("Poll failed: {}", e);
427 Vec::new()
428 }
429 }
430 }
431
432 async fn poll(&self) -> Vec<MessageExt> {
434 self.poll_zero_copy()
435 .await
436 .into_iter()
437 .map(|arc_mut| (*arc_mut).clone())
438 .collect()
439 }
440
441 async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt> {
443 self.poll_with_timeout_zero_copy(timeout)
444 .await
445 .into_iter()
446 .map(|arc_mut| (*arc_mut).clone())
447 .collect()
448 }
449
450 async fn fetch_message_queues(&self, topic: &str) -> RocketMQResult<Vec<MessageQueue>> {
451 let wrapped_topic = self.with_namespace(topic);
452 let queues = self.impl_().fetch_message_queues(wrapped_topic).await?;
453
454 let result: Vec<_> = queues.into_iter().map(|mq| self.queue_without_namespace(&mq)).collect();
456
457 Ok(result)
458 }
459
460 async fn committed(&self, message_queue: &MessageQueue) -> RocketMQResult<i64> {
461 let wrapped_mq = self.queue_with_namespace(message_queue);
462 self.impl_().committed(&wrapped_mq).await
463 }
464
465 async fn commit_all(&self) -> RocketMQResult<()> {
466 self.impl_().commit_all().await
467 }
468
469 async fn commit_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool) {
470 let mut wrapped_offsets = HashMap::with_capacity(offset_map.len());
472 for (mq, offset) in offset_map {
473 wrapped_offsets.insert(self.queue_with_namespace(&mq), offset);
474 }
475
476 if let Err(e) = self.impl_().commit(wrapped_offsets, persist).await {
477 tracing::warn!("commit_with_map failed: {}", e);
478 }
479 }
480
481 async fn commit(&self) {
482 if let Err(e) = self.commit_all().await {
483 tracing::warn!("commit failed: {}", e);
484 }
485 }
486
487 async fn commit_with_set(&self, message_queues: HashSet<MessageQueue>, persist: bool) {
488 let wrapped_queues: HashSet<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
490
491 let mut offset_map = HashMap::new();
493 for mq in &wrapped_queues {
494 if let Ok(offset) = self.impl_().committed(mq).await {
495 offset_map.insert(mq.clone(), offset);
496 }
497 }
498
499 if let Err(e) = self.impl_().commit(offset_map, persist).await {
500 tracing::warn!("commit_with_set failed: {}", e);
501 }
502 }
503
504 async fn commit_sync(&self) {
505 if let Err(e) = self.commit_all().await {
507 tracing::warn!("commit_sync failed: {}", e);
508 }
509 }
510
511 async fn commit_sync_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool) {
512 self.commit_with_map(offset_map, persist).await;
514 }
515
516 async fn is_auto_commit(&self) -> bool {
517 self.consumer_config.auto_commit
518 }
519
520 async fn set_auto_commit(&self, auto_commit: bool) {
521 self.consumer_config.mut_from_ref().auto_commit = auto_commit;
522 }
523
524 async fn offset_for_timestamp(&self, message_queue: &MessageQueue, timestamp: u64) -> RocketMQResult<i64> {
525 let wrapped_mq = self.queue_with_namespace(message_queue);
526 self.impl_().offset_for_timestamp(&wrapped_mq, timestamp).await
527 }
528
529 async fn pause(&self, message_queues: Vec<MessageQueue>) {
530 let wrapped_queues: Vec<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
531
532 self.impl_().pause(&wrapped_queues).await;
533 }
534
535 async fn resume(&self, message_queues: Vec<MessageQueue>) {
536 let wrapped_queues: Vec<_> = message_queues.iter().map(|mq| self.queue_with_namespace(mq)).collect();
537
538 self.impl_().resume(&wrapped_queues).await;
539 }
540
541 async fn is_paused(&self, message_queue: &MessageQueue) -> bool {
542 let wrapped_mq = self.queue_with_namespace(message_queue);
543 self.impl_().is_paused(&wrapped_mq).await
544 }
545
546 async fn seek(&self, message_queue: &MessageQueue, offset: i64) -> RocketMQResult<()> {
547 let wrapped_mq = self.queue_with_namespace(message_queue);
548 self.impl_().seek(&wrapped_mq, offset).await
549 }
550
551 async fn seek_to_begin(&self, message_queue: &MessageQueue) -> RocketMQResult<()> {
552 let wrapped_mq = self.queue_with_namespace(message_queue);
553 self.impl_().seek_to_begin(&wrapped_mq).await
554 }
555
556 async fn seek_to_end(&self, message_queue: &MessageQueue) -> RocketMQResult<()> {
557 let wrapped_mq = self.queue_with_namespace(message_queue);
558 self.impl_().seek_to_end(&wrapped_mq).await
559 }
560
561 async fn update_name_server_address(&self, name_server_address: &str) {
562 self.client_config
563 .mut_from_ref()
564 .set_namesrv_addr(CheetahString::from_string(name_server_address.to_string()));
565
566 if let Some(impl_) = self.default_lite_pull_consumer_impl.get() {
567 let addresses: Vec<String> = name_server_address
568 .split(';')
569 .map(|s| s.trim().to_string())
570 .filter(|s| !s.is_empty())
571 .collect();
572 impl_.update_name_server_address(addresses).await;
573 }
574 }
575
576 async fn register_topic_message_queue_change_listener<TL>(&self, topic: &str, listener: TL) -> RocketMQResult<()>
577 where
578 TL: TopicMessageQueueChangeListener + 'static,
579 {
580 let wrapped_topic = self.with_namespace(topic);
581 let listener_arc: Arc<dyn TopicMessageQueueChangeListener + Send + Sync> = Arc::new(listener);
582 self.impl_()
583 .register_topic_message_queue_change_listener(wrapped_topic, listener_arc)
584 .await
585 }
586}
587
588impl MQConsumer for DefaultLitePullConsumer {
589 async fn send_message_back(&mut self, msg: MessageExt, delay_level: i32, broker_name: &str) -> RocketMQResult<()> {
590 Err(crate::mq_client_err!(
593 -1,
594 "sendMessageBack is not supported in lite pull consumer"
595 ))
596 }
597
598 async fn fetch_subscribe_message_queues(&mut self, topic: &str) -> RocketMQResult<Vec<MessageQueue>> {
599 let queues = self.fetch_message_queues(topic).await?;
600 Ok(queues.into_iter().collect())
601 }
602}