pub trait LitePullConsumerLocal: Sync {
Show 35 methods
// Required methods
async fn start(&self) -> RocketMQResult<()>;
async fn shutdown(&self);
async fn is_running(&self) -> bool;
async fn subscribe(&self, topic: &str) -> RocketMQResult<()>;
async fn subscribe_with_expression(
&self,
topic: &str,
sub_expression: &str,
) -> RocketMQResult<()>;
async fn subscribe_with_listener<MQL>(
&self,
topic: &str,
sub_expression: &str,
listener: MQL,
) -> RocketMQResult<()>
where MQL: MessageQueueListener + 'static;
async fn subscribe_with_selector(
&self,
topic: &str,
selector: Option<MessageSelector>,
) -> RocketMQResult<()>;
async fn unsubscribe(&self, topic: &str);
async fn assignment(&self) -> RocketMQResult<HashSet<MessageQueue>>;
async fn assign(&self, message_queues: Vec<MessageQueue>);
async fn set_sub_expression_for_assign(
&self,
topic: &str,
sub_expression: &str,
);
async fn build_subscriptions_for_heartbeat(
&self,
sub_expression_map: &mut HashMap<String, MessageSelector>,
) -> RocketMQResult<()>;
async fn poll_zero_copy(&self) -> Vec<ArcMut<MessageExt>>;
async fn poll_with_timeout_zero_copy(
&self,
timeout: u64,
) -> Vec<ArcMut<MessageExt>>;
async fn poll(&self) -> Vec<MessageExt>;
async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>;
async fn seek(
&self,
message_queue: &MessageQueue,
offset: i64,
) -> RocketMQResult<()>;
async fn pause(&self, message_queues: Vec<MessageQueue>);
async fn resume(&self, message_queues: Vec<MessageQueue>);
async fn is_auto_commit(&self) -> bool;
async fn set_auto_commit(&self, auto_commit: bool);
async fn fetch_message_queues(
&self,
topic: &str,
) -> RocketMQResult<Vec<MessageQueue>>;
async fn offset_for_timestamp(
&self,
message_queue: &MessageQueue,
timestamp: u64,
) -> RocketMQResult<i64>;
async fn commit_sync(&self);
async fn commit_sync_with_map(
&self,
offset_map: HashMap<MessageQueue, i64>,
persist: bool,
);
async fn commit(&self);
async fn commit_with_map(
&self,
offset_map: HashMap<MessageQueue, i64>,
persist: bool,
);
async fn commit_with_set(
&self,
message_queues: HashSet<MessageQueue>,
persist: bool,
);
async fn committed(
&self,
message_queue: &MessageQueue,
) -> RocketMQResult<i64>;
async fn register_topic_message_queue_change_listener<TL>(
&self,
topic: &str,
listener: TL,
) -> RocketMQResult<()>
where TL: TopicMessageQueueChangeListener + 'static;
async fn update_name_server_address(&self, name_server_address: &str);
async fn seek_to_begin(
&self,
message_queue: &MessageQueue,
) -> RocketMQResult<()>;
async fn seek_to_end(
&self,
message_queue: &MessageQueue,
) -> RocketMQResult<()>;
async fn commit_all(&self) -> RocketMQResult<()>;
async fn is_paused(&self, message_queue: &MessageQueue) -> bool;
}Expand description
A consumer that pulls messages from brokers on demand, providing explicit control over fetch timing, offset management, and queue assignment.
Unlike the push consumer, the caller drives message retrieval through poll or
poll_with_timeout, and may manage offsets manually when auto-commit is disabled.
Queue assignment can be controlled either by subscribing to topics (broker-side rebalance)
or by calling assign directly (client-controlled assignment).
All methods are asynchronous and do not block the calling thread.
Required Methods§
Sourceasync fn start(&self) -> RocketMQResult<()>
async fn start(&self) -> RocketMQResult<()>
Starts the consumer and establishes connections to the broker and name server.
This function does not block the calling thread.
§Errors
Returns an error if the consumer is already running, if required configuration is invalid, or if the connection to the name server cannot be established.
Sourceasync fn shutdown(&self)
async fn shutdown(&self)
Shuts down the consumer and releases all associated resources.
This function does not block the calling thread. After shutdown, the consumer cannot be restarted.
Sourceasync fn is_running(&self) -> bool
async fn is_running(&self) -> bool
Returns whether the consumer is currently in the running state.
This function does not block the calling thread.
Sourceasync fn subscribe(&self, topic: &str) -> RocketMQResult<()>
async fn subscribe(&self, topic: &str) -> RocketMQResult<()>
Sourceasync fn subscribe_with_expression(
&self,
topic: &str,
sub_expression: &str,
) -> RocketMQResult<()>
async fn subscribe_with_expression( &self, topic: &str, sub_expression: &str, ) -> RocketMQResult<()>
Subscribes to the specified topic with a tag-based or SQL-based filter expression.
This function does not block the calling thread.
§Arguments
topic- The name of the topic to subscribe to.sub_expression- A tag expression (e.g."TagA || TagB") or SQL-92 predicate. Pass"*"to receive all messages.
§Errors
Returns an error if the topic name is invalid, the expression cannot be parsed, or the subscription cannot be registered with the broker.
Sourceasync fn subscribe_with_listener<MQL>(
&self,
topic: &str,
sub_expression: &str,
listener: MQL,
) -> RocketMQResult<()>where
MQL: MessageQueueListener + 'static,
async fn subscribe_with_listener<MQL>(
&self,
topic: &str,
sub_expression: &str,
listener: MQL,
) -> RocketMQResult<()>where
MQL: MessageQueueListener + 'static,
Subscribes to the specified topic with a filter expression and a queue-change listener.
The listener is invoked whenever the set of assigned MessageQueues changes for
this topic due to rebalance.
This function does not block the calling thread.
§Arguments
topic- The name of the topic to subscribe to.sub_expression- A tag expression or SQL-92 predicate. Pass"*"for all messages.listener- AMessageQueueListenernotified on queue assignment changes.
§Errors
Returns an error if the topic name is invalid, the expression cannot be parsed, or the subscription cannot be registered with the broker.
Sourceasync fn subscribe_with_selector(
&self,
topic: &str,
selector: Option<MessageSelector>,
) -> RocketMQResult<()>
async fn subscribe_with_selector( &self, topic: &str, selector: Option<MessageSelector>, ) -> RocketMQResult<()>
Subscribes to the specified topic using a MessageSelector for server-side filtering.
This function does not block the calling thread.
§Arguments
topic- The name of the topic to subscribe to.selector- The filter selector. PassNoneto receive all messages.
§Errors
Returns an error if the topic name is invalid, the selector expression is rejected by the broker, or the subscription cannot be registered.
Sourceasync fn unsubscribe(&self, topic: &str)
async fn unsubscribe(&self, topic: &str)
Removes the subscription for the specified topic.
Messages for this topic will no longer be fetched after the next rebalance cycle. This function does not block the calling thread.
§Arguments
topic- The name of the topic to unsubscribe from.
Sourceasync fn assignment(&self) -> RocketMQResult<HashSet<MessageQueue>>
async fn assignment(&self) -> RocketMQResult<HashSet<MessageQueue>>
Returns the set of MessageQueues currently assigned to this consumer.
This function does not block the calling thread.
§Errors
Returns an error if the consumer is not in the running state.
Sourceasync fn assign(&self, message_queues: Vec<MessageQueue>)
async fn assign(&self, message_queues: Vec<MessageQueue>)
Manually assigns the given MessageQueues to this consumer, bypassing broker rebalance.
Any previously assigned queues not present in message_queues are removed.
This function does not block the calling thread.
§Arguments
message_queues- The complete set of queues to assign to this consumer.
Sourceasync fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str)
async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str)
Sets the subscription filter expression applied when fetching from manually assigned queues.
This function does not block the calling thread.
§Arguments
topic- The topic for which the filter expression applies.sub_expression- A tag expression or SQL-92 predicate used to filter messages.
Sourceasync fn build_subscriptions_for_heartbeat(
&self,
sub_expression_map: &mut HashMap<String, MessageSelector>,
) -> RocketMQResult<()>
async fn build_subscriptions_for_heartbeat( &self, sub_expression_map: &mut HashMap<String, MessageSelector>, ) -> RocketMQResult<()>
Populates sub_expression_map with the filter selector for each subscribed topic,
providing the subscription metadata required for heartbeat payloads.
This function does not block the calling thread.
§Arguments
sub_expression_map- Output map from topic name to itsMessageSelector. Entries are inserted for every topic that has an active subscription with a selector.
§Errors
Returns an error if the subscription metadata cannot be retrieved.
Sourceasync fn poll_zero_copy(&self) -> Vec<ArcMut<MessageExt>>
async fn poll_zero_copy(&self) -> Vec<ArcMut<MessageExt>>
Fetches the next batch of messages without allocating owned copies.
Returns ArcMut<MessageExt> references to messages, providing shared mutable access
without heap allocation or deep cloning. The returned references remain valid until
they are dropped. Messages that need to outlive the poll scope must be cloned explicitly.
This method uses the default poll timeout configured for the consumer.
§Performance
Message contents are not copied. For workloads processing messages without long-term
storage, this eliminates allocation overhead compared to [poll()].
§Examples
let messages = consumer.poll_zero_copy().await;
for msg in &messages {
process_message(msg);
}
// Clone only filtered messages
let messages = consumer.poll_zero_copy().await;
let important: Vec<MessageExt> = messages.into_iter()
.filter(|msg| is_important(msg))
.map(|msg| (*msg).clone())
.collect();Returns an empty vector if no messages are available within the default timeout period. This function does not block the calling thread.
Sourceasync fn poll_with_timeout_zero_copy(
&self,
timeout: u64,
) -> Vec<ArcMut<MessageExt>>
async fn poll_with_timeout_zero_copy( &self, timeout: u64, ) -> Vec<ArcMut<MessageExt>>
Fetches the next batch of messages without allocating owned copies, with a specified timeout.
Behaves identically to [poll_zero_copy()], but waits up to timeout milliseconds
for messages to become available.
§Arguments
timeout- Maximum time to wait for messages, in milliseconds.
§Examples
let messages = consumer.poll_with_timeout_zero_copy(1000).await;Returns an empty vector if no messages are available before the timeout expires. This function does not block the calling thread.
Sourceasync fn poll(&self) -> Vec<MessageExt>
async fn poll(&self) -> Vec<MessageExt>
Fetches the next batch of messages, returning owned copies.
Each returned message is cloned from the internal message store. The caller owns the returned messages and may store them beyond the poll scope.
This method uses the default poll timeout configured for the consumer.
§Performance
All messages are deep-cloned, including message body and properties. For a 2KB message, each poll returning 32 messages allocates approximately 90KB. At 100 polls per second, this results in approximately 9MB/s of allocations.
For workloads that do not require owned messages, [poll_zero_copy()] avoids
this allocation overhead.
§Examples
let messages = consumer.poll().await;
my_store.save(messages);Returns an empty vector if no messages are available within the timeout period. This function does not block the calling thread.
Sourceasync fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>
async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>
Fetches the next batch of messages with a specified timeout, returning owned copies.
Behaves identically to [poll()], but waits up to timeout milliseconds for
messages to become available. All messages are deep-cloned.
§Arguments
timeout- Maximum time to wait for messages, in milliseconds.
Returns an empty vector if no messages are available before the timeout expires. This function does not block the calling thread.
Sourceasync fn seek(
&self,
message_queue: &MessageQueue,
offset: i64,
) -> RocketMQResult<()>
async fn seek( &self, message_queue: &MessageQueue, offset: i64, ) -> RocketMQResult<()>
Seeks the fetch position of the specified MessageQueue to the given offset.
The next poll invocation will return messages starting from offset.
This function does not block the calling thread.
§Arguments
message_queue- The queue whose fetch position is to be updated.offset- The target offset. Must be within the queue’s valid range.
§Errors
Returns an error if the queue is not currently assigned to this consumer, or if the specified offset is out of the valid range.
Sourceasync fn pause(&self, message_queues: Vec<MessageQueue>)
async fn pause(&self, message_queues: Vec<MessageQueue>)
Suspends message fetching for the specified MessageQueues.
Paused queues are excluded from subsequent poll results until resume is called.
This function does not block the calling thread.
§Arguments
message_queues- The queues to pause.
Sourceasync fn resume(&self, message_queues: Vec<MessageQueue>)
async fn resume(&self, message_queues: Vec<MessageQueue>)
Resumes message fetching for the specified MessageQueues.
This function does not block the calling thread.
§Arguments
message_queues- The queues to resume.
Sourceasync fn is_auto_commit(&self) -> bool
async fn is_auto_commit(&self) -> bool
Returns whether automatic offset commit is enabled.
This function does not block the calling thread.
Sourceasync fn set_auto_commit(&self, auto_commit: bool)
async fn set_auto_commit(&self, auto_commit: bool)
Enables or disables automatic offset commit.
When auto-commit is enabled, offsets are committed periodically without explicit calls
to commit or commit_sync. When disabled, the caller is responsible for committing
offsets.
This function does not block the calling thread.
§Arguments
auto_commit-trueto enable automatic offset commit;falseto disable it.
Sourceasync fn fetch_message_queues(
&self,
topic: &str,
) -> RocketMQResult<Vec<MessageQueue>>
async fn fetch_message_queues( &self, topic: &str, ) -> RocketMQResult<Vec<MessageQueue>>
Queries the broker for all MessageQueues belonging to the specified topic.
This function does not block the calling thread.
§Arguments
topic- The name of the topic to query.
§Errors
Returns an error if the topic does not exist, if the name server is unreachable, or if the consumer is not in the running state.
Sourceasync fn offset_for_timestamp(
&self,
message_queue: &MessageQueue,
timestamp: u64,
) -> RocketMQResult<i64>
async fn offset_for_timestamp( &self, message_queue: &MessageQueue, timestamp: u64, ) -> RocketMQResult<i64>
Queries the broker for the offset corresponding to the given timestamp in a queue.
This function does not block the calling thread.
§Arguments
message_queue- The queue to query.timestamp- The Unix timestamp in milliseconds. The broker returns the offset of the first message stored at or after this timestamp.
§Errors
Returns an error if the queue is not found on the broker or the query fails.
Sourceasync fn commit_sync(&self)
async fn commit_sync(&self)
Commits all consumed offsets and waits for the broker to acknowledge the operation.
This function does not block the calling thread.
§Deprecation
This method is deprecated. The name implies synchronous behavior, but the underlying
implementation relies on a background thread to commit offsets rather than committing
synchronously. Use commit instead.
Sourceasync fn commit_sync_with_map(
&self,
offset_map: HashMap<MessageQueue, i64>,
persist: bool,
)
async fn commit_sync_with_map( &self, offset_map: HashMap<MessageQueue, i64>, persist: bool, )
Commits the provided offsets and optionally persists them to the broker.
This function does not block the calling thread.
§Deprecation
This method is deprecated. The name implies synchronous behavior, but the underlying
implementation relies on a background thread to commit offsets rather than committing
synchronously. Use commit_with_map instead.
§Arguments
offset_map- A map fromMessageQueueto the offset to commit.persist- Whentrue, the committed offsets are persisted to the broker immediately.
Sourceasync fn commit(&self)
async fn commit(&self)
Commits all consumed offsets asynchronously.
This function does not block the calling thread. The commit is performed in the
background; use commit_sync if acknowledgment is required before proceeding.
Sourceasync fn commit_with_map(
&self,
offset_map: HashMap<MessageQueue, i64>,
persist: bool,
)
async fn commit_with_map( &self, offset_map: HashMap<MessageQueue, i64>, persist: bool, )
Commits the provided offsets asynchronously, optionally persisting them to the broker.
This function does not block the calling thread.
§Arguments
offset_map- A map fromMessageQueueto the offset to commit.persist- Whentrue, the committed offsets are persisted to the broker.
Sourceasync fn commit_with_set(
&self,
message_queues: HashSet<MessageQueue>,
persist: bool,
)
async fn commit_with_set( &self, message_queues: HashSet<MessageQueue>, persist: bool, )
Commits the offsets for the specified subset of assigned queues.
This function does not block the calling thread.
§Arguments
message_queues- The queues whose current offsets are to be committed.persist- Whentrue, the committed offsets are persisted to the broker.
Sourceasync fn committed(&self, message_queue: &MessageQueue) -> RocketMQResult<i64>
async fn committed(&self, message_queue: &MessageQueue) -> RocketMQResult<i64>
Returns the last committed offset for the specified MessageQueue.
This function does not block the calling thread.
§Arguments
message_queue- The queue to query.
§Errors
Returns an error if the queue is not assigned to this consumer or if the offset cannot be retrieved from the offset store.
Sourceasync fn register_topic_message_queue_change_listener<TL>(
&self,
topic: &str,
listener: TL,
) -> RocketMQResult<()>where
TL: TopicMessageQueueChangeListener + 'static,
async fn register_topic_message_queue_change_listener<TL>(
&self,
topic: &str,
listener: TL,
) -> RocketMQResult<()>where
TL: TopicMessageQueueChangeListener + 'static,
Registers a listener that is notified when the set of MessageQueues for a topic changes.
This function does not block the calling thread.
§Arguments
topic- The topic to monitor for queue changes.listener- ATopicMessageQueueChangeListenerinvoked when the queue set changes.
§Errors
Returns an error if a listener is already registered for the given topic, or if the registration fails due to an internal error.
Sourceasync fn update_name_server_address(&self, name_server_address: &str)
async fn update_name_server_address(&self, name_server_address: &str)
Updates the name server address used for topic route discovery.
This function does not block the calling thread.
§Arguments
name_server_address- The new semicolon-separated name server address list.
Sourceasync fn seek_to_begin(
&self,
message_queue: &MessageQueue,
) -> RocketMQResult<()>
async fn seek_to_begin( &self, message_queue: &MessageQueue, ) -> RocketMQResult<()>
Seeks the fetch position of the specified MessageQueue to its earliest available offset.
This function does not block the calling thread.
§Arguments
message_queue- The queue to seek to the beginning.
§Errors
Returns an error if the queue is not assigned to this consumer or if the earliest offset cannot be retrieved from the broker.
Sourceasync fn seek_to_end(&self, message_queue: &MessageQueue) -> RocketMQResult<()>
async fn seek_to_end(&self, message_queue: &MessageQueue) -> RocketMQResult<()>
Seeks the fetch position of the specified MessageQueue to its latest available offset.
The next poll call will return only messages published after this point.
This function does not block the calling thread.
§Arguments
message_queue- The queue to seek to the end.
§Errors
Returns an error if the queue is not assigned to this consumer or if the latest offset cannot be retrieved from the broker.
Sourceasync fn commit_all(&self) -> RocketMQResult<()>
async fn commit_all(&self) -> RocketMQResult<()>
Commits all consumed offsets for all assigned queues.
This method commits the current consumption offset for every assigned MessageQueue.
Unlike commit, which commits offsets asynchronously in the background, this method
ensures all offsets are persisted to the broker.
This function does not block the calling thread.
§Errors
Returns an error if the consumer is not in the running state or if the offset persistence fails.
Sourceasync fn is_paused(&self, message_queue: &MessageQueue) -> bool
async fn is_paused(&self, message_queue: &MessageQueue) -> bool
Checks whether a specific MessageQueue is currently paused.
A paused queue will not be fetched from during poll operations until it is resumed.
This function does not block the calling thread.
§Arguments
message_queue- The queue to check.
§Returns
true if the queue is paused, false otherwise.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.