rocketmq_client_rust/consumer/
pop_callback.rs1use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18
19use rocketmq_common::common::message::message_queue::MessageQueue;
20use rocketmq_common::common::mix_all;
21use rocketmq_error::RocketmqError;
22use rocketmq_remoting::code::response_code::ResponseCode;
23use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
24use rocketmq_rust::ArcMut;
25use tracing::warn;
26
27use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
28use crate::consumer::consumer_impl::default_mq_push_consumer_impl::PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
29use crate::consumer::consumer_impl::pop_request::PopRequest;
30use crate::consumer::pop_result::PopResult;
31use crate::consumer::pop_status::PopStatus;
32
33#[trait_variant::make(PopCallback: Send)]
37pub trait PopCallbackInner {
38 async fn on_success(&mut self, pop_result: PopResult);
44
45 fn on_error(&mut self, e: Box<dyn std::error::Error + Send>);
51}
52
53pub type PopCallbackFn = Arc<
81 dyn Fn(Option<PopResult>, Option<Box<dyn std::error::Error>>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>
82 + Send
83 + Sync,
84>;
85
86pub struct DefaultPopCallback {
87 pub(crate) push_consumer_impl: ArcMut<DefaultMQPushConsumerImpl>,
88 pub(crate) message_queue_inner: Option<MessageQueue>,
89 pub(crate) subscription_data: Option<SubscriptionData>,
90 pub(crate) pop_request: Option<PopRequest>,
91}
92
93impl PopCallback for DefaultPopCallback {
94 async fn on_success(&mut self, pop_result: PopResult) {
95 let mut push_consumer_impl = self.push_consumer_impl.clone();
96
97 let message_queue_inner = self.message_queue_inner.take().unwrap();
98 let subscription_data = self.subscription_data.take().unwrap();
99 let pop_request = self.pop_request.take().unwrap();
100
101 let pop_result = push_consumer_impl
102 .process_pop_result(pop_result, &subscription_data)
103 .await;
104 match pop_result.pop_status {
105 PopStatus::Found => {
106 if pop_result.msg_found_list.as_ref().is_none_or(|value| value.is_empty()) {
107 push_consumer_impl.execute_pop_request_immediately(pop_request).await;
108 } else {
109 push_consumer_impl
110 .consume_message_pop_service
111 .as_mut()
112 .unwrap()
113 .submit_pop_consume_request(
114 pop_result.msg_found_list.unwrap_or_default(),
115 pop_request.get_pop_process_queue(),
116 pop_request.get_message_queue(),
117 )
118 .await;
119 let pull_interval = push_consumer_impl.consumer_config.pull_interval;
120 if pull_interval > 0 {
121 push_consumer_impl.execute_pop_request_later(pop_request, pull_interval);
122 } else {
123 push_consumer_impl.execute_pop_request_immediately(pop_request).await;
124 }
125 }
126 }
127 PopStatus::NoNewMsg | PopStatus::PollingNotFound => {
128 push_consumer_impl.execute_pop_request_immediately(pop_request).await;
129 }
130 PopStatus::PollingFull => {
131 let pull_time_delay_mills_when_exception = push_consumer_impl.pull_time_delay_mills_when_exception;
132 push_consumer_impl.execute_pop_request_later(pop_request, pull_time_delay_mills_when_exception);
133 }
134 }
135 }
136
137 fn on_error(&mut self, err: Box<dyn std::error::Error + Send>) {
138 let mut push_consumer_impl = self.push_consumer_impl.clone();
139
140 let message_queue_inner = self.message_queue_inner.take().unwrap();
141 let pop_request = self.pop_request.take().unwrap();
142 let topic = message_queue_inner.topic_str();
143 if !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) {
144 if let Some(er) = err.downcast_ref::<RocketmqError>() {
145 match er {
146 RocketmqError::MQClientBrokerError(broker_error) => {
147 if ResponseCode::from(broker_error.response_code()) == ResponseCode::SubscriptionNotLatest {
148 warn!(
149 "the subscription is not latest, group={}",
150 push_consumer_impl.consumer_config.consumer_group,
151 );
152 } else {
153 warn!(
154 "execute the pop request exception, group={}",
155 push_consumer_impl.consumer_config.consumer_group
156 );
157 }
158 }
159 _ => {
160 warn!(
161 "execute the pop request exception, group={}",
162 push_consumer_impl.consumer_config.consumer_group
163 );
164 }
165 }
166 } else {
167 warn!(
168 "execute the pull request exception, group={}",
169 push_consumer_impl.consumer_config.consumer_group
170 );
171 }
172 }
173 let time_delay = if let Some(er) = err.downcast_ref::<RocketmqError>() {
174 match er {
175 RocketmqError::MQClientBrokerError(broker_error) => {
176 if ResponseCode::from(broker_error.response_code()) == ResponseCode::FlowControl {
177 PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL
178 } else {
179 push_consumer_impl.pull_time_delay_mills_when_exception
180 }
181 }
182 _ => push_consumer_impl.pull_time_delay_mills_when_exception,
183 }
184 } else {
185 push_consumer_impl.pull_time_delay_mills_when_exception
186 };
187
188 push_consumer_impl.execute_pop_request_later(pop_request, time_delay);
189 }
190}