rocketmq_client_rust/consumer/consumer_impl/
pull_message_service.rs1use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use rocketmq_common::common::message::message_enum::MessageRequestMode;
21use rocketmq_error::RocketMQError;
22use rocketmq_rust::ArcMut;
23use rocketmq_rust::Shutdown;
24use tracing::error;
25use tracing::info;
26use tracing::warn;
27
28use crate::consumer::consumer_impl::message_request::MessageRequest;
29use crate::consumer::consumer_impl::pop_request::PopRequest;
30use crate::consumer::consumer_impl::pull_request::PullRequest;
31use crate::factory::mq_client_instance::MQClientInstance;
32
33const DEFAULT_QUEUE_CAPACITY: usize = 4096;
35
36const DEFAULT_SHUTDOWN_TIMEOUT_MS: u64 = 1000;
38
39#[derive(Clone)]
55pub struct PullMessageService {
56 tx: Option<tokio::sync::mpsc::Sender<Box<dyn MessageRequest + Send + 'static>>>,
58
59 tx_shutdown: Option<tokio::sync::broadcast::Sender<()>>,
61
62 stopped: Arc<AtomicBool>,
64
65 queue_capacity: usize,
67}
68
69impl PullMessageService {
70 pub fn new() -> Self {
72 Self::with_capacity(DEFAULT_QUEUE_CAPACITY)
73 }
74
75 pub fn with_capacity(queue_capacity: usize) -> Self {
77 PullMessageService {
78 tx: None,
79 tx_shutdown: None,
80 stopped: Arc::new(AtomicBool::new(false)),
81 queue_capacity,
82 }
83 }
84
85 #[inline]
87 pub fn is_stopped(&self) -> bool {
88 self.stopped.load(Ordering::Acquire)
89 }
90
91 #[inline]
93 pub fn get_service_name(&self) -> &'static str {
94 "PullMessageService"
95 }
96
97 pub async fn start(&mut self, mut instance: ArcMut<MQClientInstance>) -> Result<(), RocketMQError> {
105 if self.tx.is_some() {
106 warn!("{} already started", self.get_service_name());
107 return Ok(());
108 }
109
110 let (tx, mut rx) = tokio::sync::mpsc::channel::<Box<dyn MessageRequest + Send + 'static>>(self.queue_capacity);
111 let (mut shutdown, tx_shutdown) = Shutdown::new(1);
112
113 self.tx = Some(tx);
114 self.tx_shutdown = Some(tx_shutdown);
115 self.stopped.store(false, Ordering::Release);
116
117 tokio::spawn(async move {
118 info!("{} service started", "PullMessageService");
119
120 loop {
121 tokio::select! {
122 _ = shutdown.recv() => {
123 info!("{} received shutdown signal", "PullMessageService");
124 break;
125 }
126 Some(request) = rx.recv() => {
127 if let Err(e) = Self::process_request(request, &mut instance).await {
129 error!("{} failed to process request: {:?}", "PullMessageService", e);
130 }
131 }
132 }
133 }
134
135 info!("{} service end", "PullMessageService");
136 });
137
138 Ok(())
139 }
140
141 async fn process_request(
150 request: Box<dyn MessageRequest + Send + 'static>,
151 instance: &mut MQClientInstance,
152 ) -> Result<(), RocketMQError> {
153 match request.get_message_request_mode() {
154 MessageRequestMode::Pull => {
155 if let Ok(pull_request) = request.into_any().downcast::<PullRequest>() {
157 Self::pull_message(*pull_request, instance).await;
158 Ok(())
159 } else {
160 Err(RocketMQError::Internal("Failed to downcast to PullRequest".to_string()))
161 }
162 }
163 MessageRequestMode::Pop => {
164 if let Ok(pop_request) = request.into_any().downcast::<PopRequest>() {
165 Self::pop_message(*pop_request, instance).await;
166 Ok(())
167 } else {
168 Err(RocketMQError::Internal("Failed to downcast to PopRequest".to_string()))
169 }
170 }
171 }
172 }
173
174 async fn pull_message(request: PullRequest, instance: &mut MQClientInstance) {
176 if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
177 consumer.pull_message(request).await;
178 } else {
179 warn!("No matched consumer for the PullRequest {}, drop it", request)
180 }
181 }
182
183 async fn pop_message(request: PopRequest, instance: &mut MQClientInstance) {
185 if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
186 consumer.pop_message(request).await;
187 } else {
188 warn!("No matched consumer for the PopRequest {}, drop it", request)
189 }
190 }
191
192 pub fn execute_pull_request_later(&self, pull_request: PullRequest, time_delay: u64) {
202 if self.is_stopped() {
203 warn!("{} has shutdown, cannot execute later task", self.get_service_name());
204 return;
205 }
206
207 let this = self.clone();
208 let request = pull_request.clone();
209
210 tokio::spawn(async move {
212 tokio::time::sleep(Duration::from_millis(time_delay)).await;
213
214 if this.is_stopped() {
215 return;
216 }
217
218 if let Some(tx) = &this.tx {
219 if let Err(e) = tx.send(Box::new(request)).await {
220 warn!("Failed to send pull request: {:?}", e);
221 }
222 }
223 });
224 }
225
226 pub async fn execute_pull_request_immediately(&self, pull_request: PullRequest) {
234 if self.is_stopped() {
235 warn!("PullMessageService has shutdown");
236 return;
237 }
238
239 if let Some(tx) = &self.tx {
240 if let Err(e) = tx.send(Box::new(pull_request)).await {
241 error!("executePullRequestImmediately messageRequestQueue.put error: {:?}", e);
242 }
243 } else {
244 warn!("PullMessageService not started");
245 }
246 }
247
248 pub fn execute_pop_pull_request_later(&self, pop_request: PopRequest, time_delay: u64) {
250 if self.is_stopped() {
251 warn!("{} has shutdown, cannot execute later task", self.get_service_name());
252 return;
253 }
254
255 let this = self.clone();
256 let request = pop_request.clone();
257
258 tokio::spawn(async move {
260 tokio::time::sleep(Duration::from_millis(time_delay)).await;
261
262 if this.is_stopped() {
263 return;
264 }
265
266 if let Some(tx) = &this.tx {
267 if let Err(e) = tx.send(Box::new(request)).await {
268 warn!("Failed to send pop request: {:?}", e);
269 }
270 }
271 });
272 }
273
274 pub async fn execute_pop_pull_request_immediately(&self, pop_request: PopRequest) {
276 if self.is_stopped() {
277 warn!("PullMessageService has shutdown");
278 return;
279 }
280
281 if let Some(tx) = &self.tx {
282 if let Err(e) = tx.send(Box::new(pop_request)).await {
283 error!(
284 "executePopPullRequestImmediately messageRequestQueue.put error: {:?}",
285 e
286 );
287 }
288 } else {
289 warn!("PullMessageService not started");
290 }
291 }
292
293 pub fn execute_task_later<F>(&self, task: F, time_delay: u64)
299 where
300 F: FnOnce() + Send + 'static,
301 {
302 if self.is_stopped() {
303 warn!("{} has shutdown, cannot execute task", self.get_service_name());
304 return;
305 }
306
307 tokio::spawn(async move {
308 tokio::time::sleep(Duration::from_millis(time_delay)).await;
309 task();
310 });
311 }
312
313 pub fn execute_task<F>(&self, task: F)
315 where
316 F: FnOnce() + Send + 'static,
317 {
318 if self.is_stopped() {
319 warn!("{} has shutdown, cannot execute task", self.get_service_name());
320 return;
321 }
322
323 tokio::spawn(async move {
324 task();
325 });
326 }
327
328 pub async fn shutdown(&self, timeout_ms: u64) -> Result<(), RocketMQError> {
339 if self.is_stopped() {
340 warn!("{} already stopped", self.get_service_name());
341 return Ok(());
342 }
343
344 info!("{} shutting down...", self.get_service_name());
345
346 self.stopped.store(true, Ordering::Release);
348
349 if let Some(tx_shutdown) = &self.tx_shutdown {
351 tx_shutdown
352 .send(())
353 .map_err(|_| RocketMQError::Internal("Failed to send shutdown signal".to_string()))?;
354 }
355
356 let wait_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
358 while self.tx.as_ref().map(|tx| !tx.is_closed()).unwrap_or(false) {
360 tokio::time::sleep(Duration::from_millis(10)).await;
361 }
362 })
363 .await;
364
365 if wait_result.is_err() {
366 warn!(
367 "{} shutdown timeout after {}ms, forcing exit",
368 self.get_service_name(),
369 timeout_ms
370 );
371 }
372
373 info!("{} shutdown completed", self.get_service_name());
374 Ok(())
375 }
376
377 pub async fn shutdown_default(&self) -> Result<(), RocketMQError> {
379 self.shutdown(DEFAULT_SHUTDOWN_TIMEOUT_MS).await
380 }
381}
382
383impl Default for PullMessageService {
384 fn default() -> Self {
385 Self::new()
386 }
387}