ordermessage_consumer/
ordermessage_consumer.rs1use std::sync::atomic::AtomicI64;
18use std::sync::Arc;
19
20use rocketmq_client_rust::consumer::default_mq_push_consumer::DefaultMQPushConsumer;
21#[allow(unused_imports)]
22use rocketmq_client_rust::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
23use rocketmq_client_rust::consumer::listener::consume_orderly_context::ConsumeOrderlyContext;
24use rocketmq_client_rust::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus;
25use rocketmq_client_rust::consumer::listener::message_listener_orderly::MessageListenerOrderly;
26use rocketmq_client_rust::consumer::mq_push_consumer::MQPushConsumer;
27use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
28use rocketmq_common::common::message::message_ext::MessageExt;
29use rocketmq_error::RocketMQResult;
30use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
31use rocketmq_rust::rocketmq;
32use tracing::info;
33
34pub const MESSAGE_COUNT: usize = 1;
35pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_3";
36pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
37pub const TOPIC: &str = "TopicTest";
38pub const TAG: &str = "*";
39
40#[rocketmq::main]
41pub async fn main() -> RocketMQResult<()> {
42 rocketmq_common::log::init_logger()?;
44
45 let builder = DefaultMQPushConsumer::builder();
47
48 let mut consumer = builder
49 .consumer_group(CONSUMER_GROUP.to_string())
50 .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
51 .message_model(MessageModel::Clustering)
52 .build();
53 consumer.subscribe(TOPIC, TAG)?;
54 consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset);
55 consumer.register_message_listener_orderly(MyMessageListener::new());
56 consumer.start().await?;
57 let _ = tokio::signal::ctrl_c().await;
58 Ok(())
59}
60
61pub struct MyMessageListener {
62 consume_times: Arc<AtomicI64>,
63}
64
65impl Default for MyMessageListener {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl MyMessageListener {
72 pub fn new() -> Self {
73 Self {
74 consume_times: Arc::new(AtomicI64::new(0)),
75 }
76 }
77}
78
79impl MessageListenerOrderly for MyMessageListener {
80 fn consume_message(
81 &self,
82 msgs: &[&MessageExt],
83 context: &mut ConsumeOrderlyContext,
84 ) -> RocketMQResult<ConsumeOrderlyStatus> {
85 context.set_auto_commit(true);
86 for msg in msgs {
87 println!("Receive message: {:?}", msg);
88 info!("Receive message: {:?}", msg);
89 }
90 if self
91 .consume_times
92 .load(std::sync::atomic::Ordering::Acquire)
93 % 2
94 == 0
95 {
96 return Ok(ConsumeOrderlyStatus::Success);
97 } else if self
98 .consume_times
99 .load(std::sync::atomic::Ordering::Acquire)
100 % 5
101 == 0
102 {
103 context.set_suspend_current_queue_time_millis(3000);
104 return Ok(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment);
105 }
106 Ok(ConsumeOrderlyStatus::Success)
107 }
108}