Skip to main content

rocketmq_client_rust/consumer/
pop_callback.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::Arc;
18
19use rocketmq_common::common::message::message_queue::MessageQueue;
20use rocketmq_common::common::mix_all;
21use rocketmq_error::RocketmqError;
22use rocketmq_remoting::code::response_code::ResponseCode;
23use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
24use rocketmq_rust::ArcMut;
25use tracing::warn;
26
27use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
28use crate::consumer::consumer_impl::default_mq_push_consumer_impl::PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
29use crate::consumer::consumer_impl::pop_request::PopRequest;
30use crate::consumer::pop_result::PopResult;
31use crate::consumer::pop_status::PopStatus;
32
33/// Trait for handling the results of a pop operation.
34///
35/// This trait defines the methods for handling successful and error results of a pop operation.
36#[trait_variant::make(PopCallback: Send)]
37pub trait PopCallbackInner {
38    /// Called when the pop operation is successful.
39    ///
40    /// # Arguments
41    ///
42    /// * `pop_result` - The result of the pop operation.
43    async fn on_success(&mut self, pop_result: PopResult);
44
45    /// Called when the pop operation encounters an error.
46    ///
47    /// # Arguments
48    ///
49    /// * `e` - The error encountered during the pop operation.
50    fn on_error(&mut self, e: Box<dyn std::error::Error + Send>);
51}
52
53/*impl<F, Fut> PopCallback for F
54where
55    F: Fn(Option<PopResult>, Option<Box<dyn std::error::Error + Send>>) -> Fut + Send + Sync,
56    Fut: Future<Output = ()> + Send,
57{
58    /// Calls the function with the pop result when the pop operation is successful.
59    ///
60    /// # Arguments
61    ///
62    /// * `pop_result` - The result of the pop operation.
63    async fn on_success(&mut self, pop_result: PopResult) {
64        (*self)(Some(pop_result), None).await;
65    }
66
67    /// Does nothing when the pop operation encounters an error.
68    ///
69    /// # Arguments
70    ///
71    /// * `e` - The error encountered during the pop operation.
72    fn on_error(&self, e: Box<dyn std::error::Error + Send>) {
73        (*self)(None, Some(e));
74    }
75}*/
76
77/// Type alias for a callback function that handles the result of a pop operation.
78///
79/// This type alias defines a callback function that takes a `PopResult` and returns a boxed future.
80pub type PopCallbackFn = Arc<
81    dyn Fn(Option<PopResult>, Option<Box<dyn std::error::Error>>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>
82        + Send
83        + Sync,
84>;
85
86pub struct DefaultPopCallback {
87    pub(crate) push_consumer_impl: ArcMut<DefaultMQPushConsumerImpl>,
88    pub(crate) message_queue_inner: Option<MessageQueue>,
89    pub(crate) subscription_data: Option<SubscriptionData>,
90    pub(crate) pop_request: Option<PopRequest>,
91}
92
93impl PopCallback for DefaultPopCallback {
94    async fn on_success(&mut self, pop_result: PopResult) {
95        let mut push_consumer_impl = self.push_consumer_impl.clone();
96
97        let message_queue_inner = self.message_queue_inner.take().unwrap();
98        let subscription_data = self.subscription_data.take().unwrap();
99        let pop_request = self.pop_request.take().unwrap();
100
101        let pop_result = push_consumer_impl
102            .process_pop_result(pop_result, &subscription_data)
103            .await;
104        match pop_result.pop_status {
105            PopStatus::Found => {
106                if pop_result.msg_found_list.as_ref().is_none_or(|value| value.is_empty()) {
107                    push_consumer_impl.execute_pop_request_immediately(pop_request).await;
108                } else {
109                    push_consumer_impl
110                        .consume_message_pop_service
111                        .as_mut()
112                        .unwrap()
113                        .submit_pop_consume_request(
114                            pop_result.msg_found_list.unwrap_or_default(),
115                            pop_request.get_pop_process_queue(),
116                            pop_request.get_message_queue(),
117                        )
118                        .await;
119                    let pull_interval = push_consumer_impl.consumer_config.pull_interval;
120                    if pull_interval > 0 {
121                        push_consumer_impl.execute_pop_request_later(pop_request, pull_interval);
122                    } else {
123                        push_consumer_impl.execute_pop_request_immediately(pop_request).await;
124                    }
125                }
126            }
127            PopStatus::NoNewMsg | PopStatus::PollingNotFound => {
128                push_consumer_impl.execute_pop_request_immediately(pop_request).await;
129            }
130            PopStatus::PollingFull => {
131                let pull_time_delay_mills_when_exception = push_consumer_impl.pull_time_delay_mills_when_exception;
132                push_consumer_impl.execute_pop_request_later(pop_request, pull_time_delay_mills_when_exception);
133            }
134        }
135    }
136
137    fn on_error(&mut self, err: Box<dyn std::error::Error + Send>) {
138        let mut push_consumer_impl = self.push_consumer_impl.clone();
139
140        let message_queue_inner = self.message_queue_inner.take().unwrap();
141        let pop_request = self.pop_request.take().unwrap();
142        let topic = message_queue_inner.topic_str();
143        if !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) {
144            if let Some(er) = err.downcast_ref::<RocketmqError>() {
145                match er {
146                    RocketmqError::MQClientBrokerError(broker_error) => {
147                        if ResponseCode::from(broker_error.response_code()) == ResponseCode::SubscriptionNotLatest {
148                            warn!(
149                                "the subscription is not latest, group={}",
150                                push_consumer_impl.consumer_config.consumer_group,
151                            );
152                        } else {
153                            warn!(
154                                "execute the pop request exception, group={}",
155                                push_consumer_impl.consumer_config.consumer_group
156                            );
157                        }
158                    }
159                    _ => {
160                        warn!(
161                            "execute the pop request exception, group={}",
162                            push_consumer_impl.consumer_config.consumer_group
163                        );
164                    }
165                }
166            } else {
167                warn!(
168                    "execute the pull request exception, group={}",
169                    push_consumer_impl.consumer_config.consumer_group
170                );
171            }
172        }
173        let time_delay = if let Some(er) = err.downcast_ref::<RocketmqError>() {
174            match er {
175                RocketmqError::MQClientBrokerError(broker_error) => {
176                    if ResponseCode::from(broker_error.response_code()) == ResponseCode::FlowControl {
177                        PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL
178                    } else {
179                        push_consumer_impl.pull_time_delay_mills_when_exception
180                    }
181                }
182                _ => push_consumer_impl.pull_time_delay_mills_when_exception,
183            }
184        } else {
185            push_consumer_impl.pull_time_delay_mills_when_exception
186        };
187
188        push_consumer_impl.execute_pop_request_later(pop_request, time_delay);
189    }
190}