Skip to main content

rocketmq_client_rust/consumer/
lite_pull_consumer.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::HashSet;
17
18use rocketmq_common::common::message::message_ext::MessageExt;
19use rocketmq_common::common::message::message_queue::MessageQueue;
20
21use crate::consumer::message_queue_listener::MessageQueueListener;
22use crate::consumer::message_selector::MessageSelector;
23use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
24
25/// A consumer that pulls messages from brokers on demand, providing explicit control over
26/// fetch timing, offset management, and queue assignment.
27///
28/// Unlike the push consumer, the caller drives message retrieval through [`poll`] or
29/// [`poll_with_timeout`], and may manage offsets manually when auto-commit is disabled.
30/// Queue assignment can be controlled either by subscribing to topics (broker-side rebalance)
31/// or by calling [`assign`] directly (client-controlled assignment).
32///
33/// All methods are asynchronous and do not block the calling thread.
34///
35/// [`poll`]: LitePullConsumerLocal::poll
36/// [`poll_with_timeout`]: LitePullConsumerLocal::poll_with_timeout
37/// [`assign`]: LitePullConsumerLocal::assign
38#[trait_variant::make(LitePullConsumer: Send)]
39pub trait LitePullConsumerLocal: Sync {
40    /// Starts the consumer and establishes connections to the broker and name server.
41    ///
42    /// This function does not block the calling thread.
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if the consumer is already running, if required configuration is
47    /// invalid, or if the connection to the name server cannot be established.
48    async fn start(&self) -> rocketmq_error::RocketMQResult<()>;
49
50    /// Shuts down the consumer and releases all associated resources.
51    ///
52    /// This function does not block the calling thread. After shutdown, the consumer
53    /// cannot be restarted.
54    async fn shutdown(&self);
55
56    /// Returns whether the consumer is currently in the running state.
57    ///
58    /// This function does not block the calling thread.
59    async fn is_running(&self) -> bool;
60
61    /// Subscribes to the specified topic using the default subscription expression.
62    ///
63    /// This function does not block the calling thread.
64    ///
65    /// # Arguments
66    ///
67    /// * `topic` - The name of the topic to subscribe to.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the topic name is invalid or if the subscription cannot be
72    /// registered with the broker.
73    async fn subscribe(&self, topic: &str) -> rocketmq_error::RocketMQResult<()>;
74
75    /// Subscribes to the specified topic with a tag-based or SQL-based filter expression.
76    ///
77    /// This function does not block the calling thread.
78    ///
79    /// # Arguments
80    ///
81    /// * `topic` - The name of the topic to subscribe to.
82    /// * `sub_expression` - A tag expression (e.g. `"TagA || TagB"`) or SQL-92 predicate. Pass
83    ///   `"*"` to receive all messages.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the topic name is invalid, the expression cannot be parsed,
88    /// or the subscription cannot be registered with the broker.
89    async fn subscribe_with_expression(&self, topic: &str, sub_expression: &str) -> rocketmq_error::RocketMQResult<()>;
90
91    /// Subscribes to the specified topic with a filter expression and a queue-change listener.
92    ///
93    /// The listener is invoked whenever the set of assigned [`MessageQueue`]s changes for
94    /// this topic due to rebalance.
95    ///
96    /// This function does not block the calling thread.
97    ///
98    /// # Arguments
99    ///
100    /// * `topic` - The name of the topic to subscribe to.
101    /// * `sub_expression` - A tag expression or SQL-92 predicate. Pass `"*"` for all messages.
102    /// * `listener` - A [`MessageQueueListener`] notified on queue assignment changes.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if the topic name is invalid, the expression cannot be parsed,
107    /// or the subscription cannot be registered with the broker.
108    async fn subscribe_with_listener<MQL>(
109        &self,
110        topic: &str,
111        sub_expression: &str,
112        listener: MQL,
113    ) -> rocketmq_error::RocketMQResult<()>
114    where
115        MQL: MessageQueueListener + 'static;
116
117    /// Subscribes to the specified topic using a [`MessageSelector`] for server-side filtering.
118    ///
119    /// This function does not block the calling thread.
120    ///
121    /// # Arguments
122    ///
123    /// * `topic` - The name of the topic to subscribe to.
124    /// * `selector` - The filter selector. Pass `None` to receive all messages.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the topic name is invalid, the selector expression is rejected
129    /// by the broker, or the subscription cannot be registered.
130    async fn subscribe_with_selector(
131        &self,
132        topic: &str,
133        selector: Option<MessageSelector>,
134    ) -> rocketmq_error::RocketMQResult<()>;
135
136    /// Removes the subscription for the specified topic.
137    ///
138    /// Messages for this topic will no longer be fetched after the next rebalance cycle.
139    /// This function does not block the calling thread.
140    ///
141    /// # Arguments
142    ///
143    /// * `topic` - The name of the topic to unsubscribe from.
144    async fn unsubscribe(&self, topic: &str);
145
146    /// Returns the set of [`MessageQueue`]s currently assigned to this consumer.
147    ///
148    /// This function does not block the calling thread.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the consumer is not in the running state.
153    async fn assignment(&self) -> rocketmq_error::RocketMQResult<HashSet<MessageQueue>>;
154
155    /// Manually assigns the given [`MessageQueue`]s to this consumer, bypassing broker rebalance.
156    ///
157    /// Any previously assigned queues not present in `message_queues` are removed.
158    /// This function does not block the calling thread.
159    ///
160    /// # Arguments
161    ///
162    /// * `message_queues` - The complete set of queues to assign to this consumer.
163    async fn assign(&self, message_queues: Vec<MessageQueue>);
164
165    /// Sets the subscription filter expression applied when fetching from manually assigned queues.
166    ///
167    /// This function does not block the calling thread.
168    ///
169    /// # Arguments
170    ///
171    /// * `topic` - The topic for which the filter expression applies.
172    /// * `sub_expression` - A tag expression or SQL-92 predicate used to filter messages.
173    async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str);
174
175    /// Populates `sub_expression_map` with the filter selector for each subscribed topic,
176    /// providing the subscription metadata required for heartbeat payloads.
177    ///
178    /// This function does not block the calling thread.
179    ///
180    /// # Arguments
181    ///
182    /// * `sub_expression_map` - Output map from topic name to its [`MessageSelector`]. Entries are
183    ///   inserted for every topic that has an active subscription with a selector.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if the subscription metadata cannot be retrieved.
188    async fn build_subscriptions_for_heartbeat(
189        &self,
190        sub_expression_map: &mut HashMap<String, MessageSelector>,
191    ) -> rocketmq_error::RocketMQResult<()>;
192
193    /// Fetches the next batch of messages without allocating owned copies.
194    ///
195    /// Returns `ArcMut<MessageExt>` references to messages, providing shared mutable access
196    /// without heap allocation or deep cloning. The returned references remain valid until
197    /// they are dropped. Messages that need to outlive the poll scope must be cloned explicitly.
198    ///
199    /// This method uses the default poll timeout configured for the consumer.
200    ///
201    /// # Performance
202    ///
203    /// Message contents are not copied. For workloads processing messages without long-term
204    /// storage, this eliminates allocation overhead compared to [`poll()`].
205    ///
206    /// # Examples
207    ///
208    /// ```rust,ignore
209    /// let messages = consumer.poll_zero_copy().await;
210    /// for msg in &messages {
211    ///     process_message(msg);
212    /// }
213    ///
214    /// // Clone only filtered messages
215    /// let messages = consumer.poll_zero_copy().await;
216    /// let important: Vec<MessageExt> = messages.into_iter()
217    ///     .filter(|msg| is_important(msg))
218    ///     .map(|msg| (*msg).clone())
219    ///     .collect();
220    /// ```
221    ///
222    /// Returns an empty vector if no messages are available within the default timeout period.
223    /// This function does not block the calling thread.
224    async fn poll_zero_copy(&self) -> Vec<rocketmq_rust::ArcMut<MessageExt>>;
225
226    /// Fetches the next batch of messages without allocating owned copies, with a specified
227    /// timeout.
228    ///
229    /// Behaves identically to [`poll_zero_copy()`], but waits up to `timeout` milliseconds
230    /// for messages to become available.
231    ///
232    /// # Arguments
233    ///
234    /// * `timeout` - Maximum time to wait for messages, in milliseconds.
235    ///
236    /// # Examples
237    ///
238    /// ```rust,ignore
239    /// let messages = consumer.poll_with_timeout_zero_copy(1000).await;
240    /// ```
241    ///
242    /// Returns an empty vector if no messages are available before the timeout expires.
243    /// This function does not block the calling thread.
244    async fn poll_with_timeout_zero_copy(&self, timeout: u64) -> Vec<rocketmq_rust::ArcMut<MessageExt>>;
245
246    /// Fetches the next batch of messages, returning owned copies.
247    ///
248    /// Each returned message is cloned from the internal message store. The caller
249    /// owns the returned messages and may store them beyond the poll scope.
250    ///
251    /// This method uses the default poll timeout configured for the consumer.
252    ///
253    /// # Performance
254    ///
255    /// All messages are deep-cloned, including message body and properties. For a 2KB message,
256    /// each poll returning 32 messages allocates approximately 90KB. At 100 polls per second,
257    /// this results in approximately 9MB/s of allocations.
258    ///
259    /// For workloads that do not require owned messages, [`poll_zero_copy()`] avoids
260    /// this allocation overhead.
261    ///
262    /// # Examples
263    ///
264    /// ```rust,ignore
265    /// let messages = consumer.poll().await;
266    /// my_store.save(messages);
267    /// ```
268    ///
269    /// Returns an empty vector if no messages are available within the timeout period.
270    /// This function does not block the calling thread.
271    async fn poll(&self) -> Vec<MessageExt>;
272
273    /// Fetches the next batch of messages with a specified timeout, returning owned copies.
274    ///
275    /// Behaves identically to [`poll()`], but waits up to `timeout` milliseconds for
276    /// messages to become available. All messages are deep-cloned.
277    ///
278    /// # Arguments
279    ///
280    /// * `timeout` - Maximum time to wait for messages, in milliseconds.
281    ///
282    /// Returns an empty vector if no messages are available before the timeout expires.
283    /// This function does not block the calling thread.
284    async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>;
285
286    /// Seeks the fetch position of the specified [`MessageQueue`] to the given offset.
287    ///
288    /// The next [`poll`] invocation will return messages starting from `offset`.
289    /// This function does not block the calling thread.
290    ///
291    /// # Arguments
292    ///
293    /// * `message_queue` - The queue whose fetch position is to be updated.
294    /// * `offset` - The target offset. Must be within the queue's valid range.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if the queue is not currently assigned to this consumer,
299    /// or if the specified offset is out of the valid range.
300    ///
301    /// [`poll`]: LitePullConsumerLocal::poll
302    async fn seek(&self, message_queue: &MessageQueue, offset: i64) -> rocketmq_error::RocketMQResult<()>;
303
304    /// Suspends message fetching for the specified [`MessageQueue`]s.
305    ///
306    /// Paused queues are excluded from subsequent [`poll`] results until [`resume`] is called.
307    /// This function does not block the calling thread.
308    ///
309    /// # Arguments
310    ///
311    /// * `message_queues` - The queues to pause.
312    ///
313    /// [`poll`]: LitePullConsumerLocal::poll
314    /// [`resume`]: LitePullConsumerLocal::resume
315    async fn pause(&self, message_queues: Vec<MessageQueue>);
316
317    /// Resumes message fetching for the specified [`MessageQueue`]s.
318    ///
319    /// This function does not block the calling thread.
320    ///
321    /// # Arguments
322    ///
323    /// * `message_queues` - The queues to resume.
324    async fn resume(&self, message_queues: Vec<MessageQueue>);
325
326    /// Returns whether automatic offset commit is enabled.
327    ///
328    /// This function does not block the calling thread.
329    async fn is_auto_commit(&self) -> bool;
330
331    /// Enables or disables automatic offset commit.
332    ///
333    /// When auto-commit is enabled, offsets are committed periodically without explicit calls
334    /// to [`commit`] or [`commit_sync`]. When disabled, the caller is responsible for committing
335    /// offsets.
336    ///
337    /// This function does not block the calling thread.
338    ///
339    /// # Arguments
340    ///
341    /// * `auto_commit` - `true` to enable automatic offset commit; `false` to disable it.
342    ///
343    /// [`commit`]: LitePullConsumerLocal::commit
344    /// [`commit_sync`]: LitePullConsumerLocal::commit_sync
345    async fn set_auto_commit(&self, auto_commit: bool);
346
347    /// Queries the broker for all [`MessageQueue`]s belonging to the specified topic.
348    ///
349    /// This function does not block the calling thread.
350    ///
351    /// # Arguments
352    ///
353    /// * `topic` - The name of the topic to query.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if the topic does not exist, if the name server is unreachable,
358    /// or if the consumer is not in the running state.
359    async fn fetch_message_queues(&self, topic: &str) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>>;
360
361    /// Queries the broker for the offset corresponding to the given timestamp in a queue.
362    ///
363    /// This function does not block the calling thread.
364    ///
365    /// # Arguments
366    ///
367    /// * `message_queue` - The queue to query.
368    /// * `timestamp` - The Unix timestamp in milliseconds. The broker returns the offset of the
369    ///   first message stored at or after this timestamp.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the queue is not found on the broker or the query fails.
374    async fn offset_for_timestamp(
375        &self,
376        message_queue: &MessageQueue,
377        timestamp: u64,
378    ) -> rocketmq_error::RocketMQResult<i64>;
379
380    /// Commits all consumed offsets and waits for the broker to acknowledge the operation.
381    ///
382    /// This function does not block the calling thread.
383    ///
384    /// # Deprecation
385    ///
386    /// This method is deprecated. The name implies synchronous behavior, but the underlying
387    /// implementation relies on a background thread to commit offsets rather than committing
388    /// synchronously. Use [`commit`] instead.
389    ///
390    /// [`commit`]: LitePullConsumerLocal::commit
391    async fn commit_sync(&self);
392
393    /// Commits the provided offsets and optionally persists them to the broker.
394    ///
395    /// This function does not block the calling thread.
396    ///
397    /// # Deprecation
398    ///
399    /// This method is deprecated. The name implies synchronous behavior, but the underlying
400    /// implementation relies on a background thread to commit offsets rather than committing
401    /// synchronously. Use [`commit_with_map`] instead.
402    ///
403    /// # Arguments
404    ///
405    /// * `offset_map` - A map from [`MessageQueue`] to the offset to commit.
406    /// * `persist` - When `true`, the committed offsets are persisted to the broker immediately.
407    ///
408    /// [`commit_with_map`]: LitePullConsumerLocal::commit_with_map
409    async fn commit_sync_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
410
411    /// Commits all consumed offsets asynchronously.
412    ///
413    /// This function does not block the calling thread. The commit is performed in the
414    /// background; use [`commit_sync`] if acknowledgment is required before proceeding.
415    ///
416    /// [`commit_sync`]: LitePullConsumerLocal::commit_sync
417    async fn commit(&self);
418
419    /// Commits the provided offsets asynchronously, optionally persisting them to the broker.
420    ///
421    /// This function does not block the calling thread.
422    ///
423    /// # Arguments
424    ///
425    /// * `offset_map` - A map from [`MessageQueue`] to the offset to commit.
426    /// * `persist` - When `true`, the committed offsets are persisted to the broker.
427    async fn commit_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
428
429    /// Commits the offsets for the specified subset of assigned queues.
430    ///
431    /// This function does not block the calling thread.
432    ///
433    /// # Arguments
434    ///
435    /// * `message_queues` - The queues whose current offsets are to be committed.
436    /// * `persist` - When `true`, the committed offsets are persisted to the broker.
437    async fn commit_with_set(&self, message_queues: HashSet<MessageQueue>, persist: bool);
438
439    /// Returns the last committed offset for the specified [`MessageQueue`].
440    ///
441    /// This function does not block the calling thread.
442    ///
443    /// # Arguments
444    ///
445    /// * `message_queue` - The queue to query.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the queue is not assigned to this consumer or if the offset
450    /// cannot be retrieved from the offset store.
451    async fn committed(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<i64>;
452
453    /// Registers a listener that is notified when the set of [`MessageQueue`]s for a topic changes.
454    ///
455    /// This function does not block the calling thread.
456    ///
457    /// # Arguments
458    ///
459    /// * `topic` - The topic to monitor for queue changes.
460    /// * `listener` - A [`TopicMessageQueueChangeListener`] invoked when the queue set changes.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if a listener is already registered for the given topic, or if the
465    /// registration fails due to an internal error.
466    async fn register_topic_message_queue_change_listener<TL>(
467        &self,
468        topic: &str,
469        listener: TL,
470    ) -> rocketmq_error::RocketMQResult<()>
471    where
472        TL: TopicMessageQueueChangeListener + 'static;
473
474    /// Updates the name server address used for topic route discovery.
475    ///
476    /// This function does not block the calling thread.
477    ///
478    /// # Arguments
479    ///
480    /// * `name_server_address` - The new semicolon-separated name server address list.
481    async fn update_name_server_address(&self, name_server_address: &str);
482
483    /// Seeks the fetch position of the specified [`MessageQueue`] to its earliest available offset.
484    ///
485    /// This function does not block the calling thread.
486    ///
487    /// # Arguments
488    ///
489    /// * `message_queue` - The queue to seek to the beginning.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if the queue is not assigned to this consumer or if the earliest
494    /// offset cannot be retrieved from the broker.
495    async fn seek_to_begin(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<()>;
496
497    /// Seeks the fetch position of the specified [`MessageQueue`] to its latest available offset.
498    ///
499    /// The next [`poll`] call will return only messages published after this point.
500    /// This function does not block the calling thread.
501    ///
502    /// # Arguments
503    ///
504    /// * `message_queue` - The queue to seek to the end.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the queue is not assigned to this consumer or if the latest
509    /// offset cannot be retrieved from the broker.
510    ///
511    /// [`poll`]: LitePullConsumerLocal::poll
512    async fn seek_to_end(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<()>;
513
514    /// Commits all consumed offsets for all assigned queues.
515    ///
516    /// This method commits the current consumption offset for every assigned [`MessageQueue`].
517    /// Unlike [`commit`], which commits offsets asynchronously in the background, this method
518    /// ensures all offsets are persisted to the broker.
519    ///
520    /// This function does not block the calling thread.
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if the consumer is not in the running state or if the offset
525    /// persistence fails.
526    ///
527    /// [`commit`]: LitePullConsumerLocal::commit
528    async fn commit_all(&self) -> rocketmq_error::RocketMQResult<()>;
529
530    /// Checks whether a specific [`MessageQueue`] is currently paused.
531    ///
532    /// A paused queue will not be fetched from during [`poll`] operations until it is resumed.
533    ///
534    /// This function does not block the calling thread.
535    ///
536    /// # Arguments
537    ///
538    /// * `message_queue` - The queue to check.
539    ///
540    /// # Returns
541    ///
542    /// `true` if the queue is paused, `false` otherwise.
543    ///
544    /// [`poll`]: LitePullConsumerLocal::poll
545    async fn is_paused(&self, message_queue: &MessageQueue) -> bool;
546}