Skip to main content

rocketmq_client_rust/consumer/consumer_impl/
pull_message_service.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use rocketmq_common::common::message::message_enum::MessageRequestMode;
21use rocketmq_error::RocketMQError;
22use rocketmq_rust::ArcMut;
23use rocketmq_rust::Shutdown;
24use tracing::error;
25use tracing::info;
26use tracing::warn;
27
28use crate::consumer::consumer_impl::message_request::MessageRequest;
29use crate::consumer::consumer_impl::pop_request::PopRequest;
30use crate::consumer::consumer_impl::pull_request::PullRequest;
31use crate::factory::mq_client_instance::MQClientInstance;
32
33/// Default queue capacity for message requests
34const DEFAULT_QUEUE_CAPACITY: usize = 4096;
35
36/// Default shutdown timeout in milliseconds
37const DEFAULT_SHUTDOWN_TIMEOUT_MS: u64 = 1000;
38
39/// RocketMQ Consumer Pull Message Service
40///
41/// # Responsibilities
42/// - Asynchronously schedules Pull/Pop message requests
43/// - Supports both delayed and immediate scheduling
44/// - Manages lifecycle of background pull tasks
45///
46/// # Thread Model
47/// - Main loop: Single Tokio task
48/// - Delayed tasks: Individual tokio::spawn tasks
49///
50/// # Shutdown Semantics
51/// - `shutdown()` sends stop signal and waits for graceful termination
52/// - Main loop processes current request before exiting
53/// - Delayed tasks check `is_stopped()` before execution
54#[derive(Clone)]
55pub struct PullMessageService {
56    /// Message request channel sender
57    tx: Option<tokio::sync::mpsc::Sender<Box<dyn MessageRequest + Send + 'static>>>,
58
59    /// Shutdown signal broadcaster
60    tx_shutdown: Option<tokio::sync::broadcast::Sender<()>>,
61
62    /// Service stopped flag (for fast check)
63    stopped: Arc<AtomicBool>,
64
65    /// Queue capacity
66    queue_capacity: usize,
67}
68
69impl PullMessageService {
70    /// Creates a new PullMessageService instance
71    pub fn new() -> Self {
72        Self::with_capacity(DEFAULT_QUEUE_CAPACITY)
73    }
74
75    /// Creates a new PullMessageService instance with custom queue capacity
76    pub fn with_capacity(queue_capacity: usize) -> Self {
77        PullMessageService {
78            tx: None,
79            tx_shutdown: None,
80            stopped: Arc::new(AtomicBool::new(false)),
81            queue_capacity,
82        }
83    }
84
85    /// Checks if the service is stopped
86    #[inline]
87    pub fn is_stopped(&self) -> bool {
88        self.stopped.load(Ordering::Acquire)
89    }
90
91    /// Gets service name
92    #[inline]
93    pub fn get_service_name(&self) -> &'static str {
94        "PullMessageService"
95    }
96
97    /// Starts the pull message service
98    ///
99    /// # Arguments
100    /// * `instance` - MQClientInstance for message processing
101    ///
102    /// # Errors
103    /// Returns error if service is already started
104    pub async fn start(&mut self, mut instance: ArcMut<MQClientInstance>) -> Result<(), RocketMQError> {
105        if self.tx.is_some() {
106            warn!("{} already started", self.get_service_name());
107            return Ok(());
108        }
109
110        let (tx, mut rx) = tokio::sync::mpsc::channel::<Box<dyn MessageRequest + Send + 'static>>(self.queue_capacity);
111        let (mut shutdown, tx_shutdown) = Shutdown::new(1);
112
113        self.tx = Some(tx);
114        self.tx_shutdown = Some(tx_shutdown);
115        self.stopped.store(false, Ordering::Release);
116
117        tokio::spawn(async move {
118            info!("{} service started", "PullMessageService");
119
120            loop {
121                tokio::select! {
122                    _ = shutdown.recv() => {
123                        info!("{} received shutdown signal", "PullMessageService");
124                        break;
125                    }
126                    Some(request) = rx.recv() => {
127                        // Process request with exception handling
128                        if let Err(e) = Self::process_request(request, &mut instance).await {
129                            error!("{} failed to process request: {:?}", "PullMessageService", e);
130                        }
131                    }
132                }
133            }
134
135            info!("{} service end", "PullMessageService");
136        });
137
138        Ok(())
139    }
140
141    /// Processes a message request (Pull or Pop)
142    ///
143    /// # Arguments
144    /// * `request` - Message request to process
145    /// * `instance` - MQClientInstance for consumer lookup
146    ///
147    /// # Errors
148    /// Returns error if request processing fails
149    async fn process_request(
150        request: Box<dyn MessageRequest + Send + 'static>,
151        instance: &mut MQClientInstance,
152    ) -> Result<(), RocketMQError> {
153        match request.get_message_request_mode() {
154            MessageRequestMode::Pull => {
155                // Safe downcast using Any trait
156                if let Ok(pull_request) = request.into_any().downcast::<PullRequest>() {
157                    Self::pull_message(*pull_request, instance).await;
158                    Ok(())
159                } else {
160                    Err(RocketMQError::Internal("Failed to downcast to PullRequest".to_string()))
161                }
162            }
163            MessageRequestMode::Pop => {
164                if let Ok(pop_request) = request.into_any().downcast::<PopRequest>() {
165                    Self::pop_message(*pop_request, instance).await;
166                    Ok(())
167                } else {
168                    Err(RocketMQError::Internal("Failed to downcast to PopRequest".to_string()))
169                }
170            }
171        }
172    }
173
174    /// Handles pull message request
175    async fn pull_message(request: PullRequest, instance: &mut MQClientInstance) {
176        if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
177            consumer.pull_message(request).await;
178        } else {
179            warn!("No matched consumer for the PullRequest {}, drop it", request)
180        }
181    }
182
183    /// Handles pop message request
184    async fn pop_message(request: PopRequest, instance: &mut MQClientInstance) {
185        if let Some(mut consumer) = instance.select_consumer(request.get_consumer_group()).await {
186            consumer.pop_message(request).await;
187        } else {
188            warn!("No matched consumer for the PopRequest {}, drop it", request)
189        }
190    }
191
192    /// Executes pull request with delay
193    ///
194    /// # Arguments
195    /// * `pull_request` - The pull request to execute
196    /// * `time_delay` - Delay in milliseconds before execution
197    ///
198    /// # Behavior
199    /// - Returns immediately if service is stopped
200    /// - Spawns a tokio task that sleeps then sends the request
201    pub fn execute_pull_request_later(&self, pull_request: PullRequest, time_delay: u64) {
202        if self.is_stopped() {
203            warn!("{} has shutdown, cannot execute later task", self.get_service_name());
204            return;
205        }
206
207        let this = self.clone();
208        let request = pull_request.clone();
209
210        // Use a one-shot scheduled task
211        tokio::spawn(async move {
212            tokio::time::sleep(Duration::from_millis(time_delay)).await;
213
214            if this.is_stopped() {
215                return;
216            }
217
218            if let Some(tx) = &this.tx {
219                if let Err(e) = tx.send(Box::new(request)).await {
220                    warn!("Failed to send pull request: {:?}", e);
221                }
222            }
223        });
224    }
225
226    /// Executes pull request immediately
227    ///
228    /// # Arguments
229    /// * `pull_request` - The pull request to execute
230    ///
231    /// # Behavior
232    /// Logs error but does not return error (aligned with Java implementation)
233    pub async fn execute_pull_request_immediately(&self, pull_request: PullRequest) {
234        if self.is_stopped() {
235            warn!("PullMessageService has shutdown");
236            return;
237        }
238
239        if let Some(tx) = &self.tx {
240            if let Err(e) = tx.send(Box::new(pull_request)).await {
241                error!("executePullRequestImmediately messageRequestQueue.put error: {:?}", e);
242            }
243        } else {
244            warn!("PullMessageService not started");
245        }
246    }
247
248    /// Executes pop request with delay
249    pub fn execute_pop_pull_request_later(&self, pop_request: PopRequest, time_delay: u64) {
250        if self.is_stopped() {
251            warn!("{} has shutdown, cannot execute later task", self.get_service_name());
252            return;
253        }
254
255        let this = self.clone();
256        let request = pop_request.clone();
257
258        // Use a one-shot scheduled task
259        tokio::spawn(async move {
260            tokio::time::sleep(Duration::from_millis(time_delay)).await;
261
262            if this.is_stopped() {
263                return;
264            }
265
266            if let Some(tx) = &this.tx {
267                if let Err(e) = tx.send(Box::new(request)).await {
268                    warn!("Failed to send pop request: {:?}", e);
269                }
270            }
271        });
272    }
273
274    /// Executes pop request immediately
275    pub async fn execute_pop_pull_request_immediately(&self, pop_request: PopRequest) {
276        if self.is_stopped() {
277            warn!("PullMessageService has shutdown");
278            return;
279        }
280
281        if let Some(tx) = &self.tx {
282            if let Err(e) = tx.send(Box::new(pop_request)).await {
283                error!(
284                    "executePopPullRequestImmediately messageRequestQueue.put error: {:?}",
285                    e
286                );
287            }
288        } else {
289            warn!("PullMessageService not started");
290        }
291    }
292
293    /// Executes a generic task with delay (equivalent to Java's executeTaskLater)
294    ///
295    /// # Arguments
296    /// * `task` - Task function to execute
297    /// * `time_delay` - Delay in milliseconds before execution
298    pub fn execute_task_later<F>(&self, task: F, time_delay: u64)
299    where
300        F: FnOnce() + Send + 'static,
301    {
302        if self.is_stopped() {
303            warn!("{} has shutdown, cannot execute task", self.get_service_name());
304            return;
305        }
306
307        tokio::spawn(async move {
308            tokio::time::sleep(Duration::from_millis(time_delay)).await;
309            task();
310        });
311    }
312
313    /// Executes a generic task immediately (equivalent to Java's executeTask)
314    pub fn execute_task<F>(&self, task: F)
315    where
316        F: FnOnce() + Send + 'static,
317    {
318        if self.is_stopped() {
319            warn!("{} has shutdown, cannot execute task", self.get_service_name());
320            return;
321        }
322
323        tokio::spawn(async move {
324            task();
325        });
326    }
327
328    /// Gracefully shuts down the service
329    ///
330    /// # Arguments
331    /// * `timeout_ms` - Maximum time to wait for shutdown (milliseconds)
332    ///
333    /// # Behavior
334    /// - Sets stopped flag
335    /// - Sends shutdown signal to main loop
336    /// - Cancels all scheduled tasks
337    /// - Waits for main loop to finish (with timeout)
338    pub async fn shutdown(&self, timeout_ms: u64) -> Result<(), RocketMQError> {
339        if self.is_stopped() {
340            warn!("{} already stopped", self.get_service_name());
341            return Ok(());
342        }
343
344        info!("{} shutting down...", self.get_service_name());
345
346        // 1. Set stopped flag
347        self.stopped.store(true, Ordering::Release);
348
349        // 2. Send shutdown signal
350        if let Some(tx_shutdown) = &self.tx_shutdown {
351            tx_shutdown
352                .send(())
353                .map_err(|_| RocketMQError::Internal("Failed to send shutdown signal".to_string()))?;
354        }
355
356        // 3. Wait for main loop to exit (with timeout)
357        let wait_result = tokio::time::timeout(Duration::from_millis(timeout_ms), async {
358            // Wait for tx to be dropped (main loop exited)
359            while self.tx.as_ref().map(|tx| !tx.is_closed()).unwrap_or(false) {
360                tokio::time::sleep(Duration::from_millis(10)).await;
361            }
362        })
363        .await;
364
365        if wait_result.is_err() {
366            warn!(
367                "{} shutdown timeout after {}ms, forcing exit",
368                self.get_service_name(),
369                timeout_ms
370            );
371        }
372
373        info!("{} shutdown completed", self.get_service_name());
374        Ok(())
375    }
376
377    /// Shuts down with default timeout
378    pub async fn shutdown_default(&self) -> Result<(), RocketMQError> {
379        self.shutdown(DEFAULT_SHUTDOWN_TIMEOUT_MS).await
380    }
381}
382
383impl Default for PullMessageService {
384    fn default() -> Self {
385        Self::new()
386    }
387}