rocketmq_client_rust/consumer/lite_pull_consumer.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::HashSet;
17
18use rocketmq_common::common::message::message_ext::MessageExt;
19use rocketmq_common::common::message::message_queue::MessageQueue;
20
21use crate::consumer::message_queue_listener::MessageQueueListener;
22use crate::consumer::message_selector::MessageSelector;
23use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
24
25/// A consumer that pulls messages from brokers on demand, providing explicit control over
26/// fetch timing, offset management, and queue assignment.
27///
28/// Unlike the push consumer, the caller drives message retrieval through [`poll`] or
29/// [`poll_with_timeout`], and may manage offsets manually when auto-commit is disabled.
30/// Queue assignment can be controlled either by subscribing to topics (broker-side rebalance)
31/// or by calling [`assign`] directly (client-controlled assignment).
32///
33/// All methods are asynchronous and do not block the calling thread.
34///
35/// [`poll`]: LitePullConsumerLocal::poll
36/// [`poll_with_timeout`]: LitePullConsumerLocal::poll_with_timeout
37/// [`assign`]: LitePullConsumerLocal::assign
38#[trait_variant::make(LitePullConsumer: Send)]
39pub trait LitePullConsumerLocal: Sync {
40 /// Starts the consumer and establishes connections to the broker and name server.
41 ///
42 /// This function does not block the calling thread.
43 ///
44 /// # Errors
45 ///
46 /// Returns an error if the consumer is already running, if required configuration is
47 /// invalid, or if the connection to the name server cannot be established.
48 async fn start(&self) -> rocketmq_error::RocketMQResult<()>;
49
50 /// Shuts down the consumer and releases all associated resources.
51 ///
52 /// This function does not block the calling thread. After shutdown, the consumer
53 /// cannot be restarted.
54 async fn shutdown(&self);
55
56 /// Returns whether the consumer is currently in the running state.
57 ///
58 /// This function does not block the calling thread.
59 async fn is_running(&self) -> bool;
60
61 /// Subscribes to the specified topic using the default subscription expression.
62 ///
63 /// This function does not block the calling thread.
64 ///
65 /// # Arguments
66 ///
67 /// * `topic` - The name of the topic to subscribe to.
68 ///
69 /// # Errors
70 ///
71 /// Returns an error if the topic name is invalid or if the subscription cannot be
72 /// registered with the broker.
73 async fn subscribe(&self, topic: &str) -> rocketmq_error::RocketMQResult<()>;
74
75 /// Subscribes to the specified topic with a tag-based or SQL-based filter expression.
76 ///
77 /// This function does not block the calling thread.
78 ///
79 /// # Arguments
80 ///
81 /// * `topic` - The name of the topic to subscribe to.
82 /// * `sub_expression` - A tag expression (e.g. `"TagA || TagB"`) or SQL-92 predicate. Pass
83 /// `"*"` to receive all messages.
84 ///
85 /// # Errors
86 ///
87 /// Returns an error if the topic name is invalid, the expression cannot be parsed,
88 /// or the subscription cannot be registered with the broker.
89 async fn subscribe_with_expression(&self, topic: &str, sub_expression: &str) -> rocketmq_error::RocketMQResult<()>;
90
91 /// Subscribes to the specified topic with a filter expression and a queue-change listener.
92 ///
93 /// The listener is invoked whenever the set of assigned [`MessageQueue`]s changes for
94 /// this topic due to rebalance.
95 ///
96 /// This function does not block the calling thread.
97 ///
98 /// # Arguments
99 ///
100 /// * `topic` - The name of the topic to subscribe to.
101 /// * `sub_expression` - A tag expression or SQL-92 predicate. Pass `"*"` for all messages.
102 /// * `listener` - A [`MessageQueueListener`] notified on queue assignment changes.
103 ///
104 /// # Errors
105 ///
106 /// Returns an error if the topic name is invalid, the expression cannot be parsed,
107 /// or the subscription cannot be registered with the broker.
108 async fn subscribe_with_listener<MQL>(
109 &self,
110 topic: &str,
111 sub_expression: &str,
112 listener: MQL,
113 ) -> rocketmq_error::RocketMQResult<()>
114 where
115 MQL: MessageQueueListener + 'static;
116
117 /// Subscribes to the specified topic using a [`MessageSelector`] for server-side filtering.
118 ///
119 /// This function does not block the calling thread.
120 ///
121 /// # Arguments
122 ///
123 /// * `topic` - The name of the topic to subscribe to.
124 /// * `selector` - The filter selector. Pass `None` to receive all messages.
125 ///
126 /// # Errors
127 ///
128 /// Returns an error if the topic name is invalid, the selector expression is rejected
129 /// by the broker, or the subscription cannot be registered.
130 async fn subscribe_with_selector(
131 &self,
132 topic: &str,
133 selector: Option<MessageSelector>,
134 ) -> rocketmq_error::RocketMQResult<()>;
135
136 /// Removes the subscription for the specified topic.
137 ///
138 /// Messages for this topic will no longer be fetched after the next rebalance cycle.
139 /// This function does not block the calling thread.
140 ///
141 /// # Arguments
142 ///
143 /// * `topic` - The name of the topic to unsubscribe from.
144 async fn unsubscribe(&self, topic: &str);
145
146 /// Returns the set of [`MessageQueue`]s currently assigned to this consumer.
147 ///
148 /// This function does not block the calling thread.
149 ///
150 /// # Errors
151 ///
152 /// Returns an error if the consumer is not in the running state.
153 async fn assignment(&self) -> rocketmq_error::RocketMQResult<HashSet<MessageQueue>>;
154
155 /// Manually assigns the given [`MessageQueue`]s to this consumer, bypassing broker rebalance.
156 ///
157 /// Any previously assigned queues not present in `message_queues` are removed.
158 /// This function does not block the calling thread.
159 ///
160 /// # Arguments
161 ///
162 /// * `message_queues` - The complete set of queues to assign to this consumer.
163 async fn assign(&self, message_queues: Vec<MessageQueue>);
164
165 /// Sets the subscription filter expression applied when fetching from manually assigned queues.
166 ///
167 /// This function does not block the calling thread.
168 ///
169 /// # Arguments
170 ///
171 /// * `topic` - The topic for which the filter expression applies.
172 /// * `sub_expression` - A tag expression or SQL-92 predicate used to filter messages.
173 async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str);
174
175 /// Populates `sub_expression_map` with the filter selector for each subscribed topic,
176 /// providing the subscription metadata required for heartbeat payloads.
177 ///
178 /// This function does not block the calling thread.
179 ///
180 /// # Arguments
181 ///
182 /// * `sub_expression_map` - Output map from topic name to its [`MessageSelector`]. Entries are
183 /// inserted for every topic that has an active subscription with a selector.
184 ///
185 /// # Errors
186 ///
187 /// Returns an error if the subscription metadata cannot be retrieved.
188 async fn build_subscriptions_for_heartbeat(
189 &self,
190 sub_expression_map: &mut HashMap<String, MessageSelector>,
191 ) -> rocketmq_error::RocketMQResult<()>;
192
193 /// Fetches the next batch of messages without allocating owned copies.
194 ///
195 /// Returns `ArcMut<MessageExt>` references to messages, providing shared mutable access
196 /// without heap allocation or deep cloning. The returned references remain valid until
197 /// they are dropped. Messages that need to outlive the poll scope must be cloned explicitly.
198 ///
199 /// This method uses the default poll timeout configured for the consumer.
200 ///
201 /// # Performance
202 ///
203 /// Message contents are not copied. For workloads processing messages without long-term
204 /// storage, this eliminates allocation overhead compared to [`poll()`].
205 ///
206 /// # Examples
207 ///
208 /// ```rust,ignore
209 /// let messages = consumer.poll_zero_copy().await;
210 /// for msg in &messages {
211 /// process_message(msg);
212 /// }
213 ///
214 /// // Clone only filtered messages
215 /// let messages = consumer.poll_zero_copy().await;
216 /// let important: Vec<MessageExt> = messages.into_iter()
217 /// .filter(|msg| is_important(msg))
218 /// .map(|msg| (*msg).clone())
219 /// .collect();
220 /// ```
221 ///
222 /// Returns an empty vector if no messages are available within the default timeout period.
223 /// This function does not block the calling thread.
224 async fn poll_zero_copy(&self) -> Vec<rocketmq_rust::ArcMut<MessageExt>>;
225
226 /// Fetches the next batch of messages without allocating owned copies, with a specified
227 /// timeout.
228 ///
229 /// Behaves identically to [`poll_zero_copy()`], but waits up to `timeout` milliseconds
230 /// for messages to become available.
231 ///
232 /// # Arguments
233 ///
234 /// * `timeout` - Maximum time to wait for messages, in milliseconds.
235 ///
236 /// # Examples
237 ///
238 /// ```rust,ignore
239 /// let messages = consumer.poll_with_timeout_zero_copy(1000).await;
240 /// ```
241 ///
242 /// Returns an empty vector if no messages are available before the timeout expires.
243 /// This function does not block the calling thread.
244 async fn poll_with_timeout_zero_copy(&self, timeout: u64) -> Vec<rocketmq_rust::ArcMut<MessageExt>>;
245
246 /// Fetches the next batch of messages, returning owned copies.
247 ///
248 /// Each returned message is cloned from the internal message store. The caller
249 /// owns the returned messages and may store them beyond the poll scope.
250 ///
251 /// This method uses the default poll timeout configured for the consumer.
252 ///
253 /// # Performance
254 ///
255 /// All messages are deep-cloned, including message body and properties. For a 2KB message,
256 /// each poll returning 32 messages allocates approximately 90KB. At 100 polls per second,
257 /// this results in approximately 9MB/s of allocations.
258 ///
259 /// For workloads that do not require owned messages, [`poll_zero_copy()`] avoids
260 /// this allocation overhead.
261 ///
262 /// # Examples
263 ///
264 /// ```rust,ignore
265 /// let messages = consumer.poll().await;
266 /// my_store.save(messages);
267 /// ```
268 ///
269 /// Returns an empty vector if no messages are available within the timeout period.
270 /// This function does not block the calling thread.
271 async fn poll(&self) -> Vec<MessageExt>;
272
273 /// Fetches the next batch of messages with a specified timeout, returning owned copies.
274 ///
275 /// Behaves identically to [`poll()`], but waits up to `timeout` milliseconds for
276 /// messages to become available. All messages are deep-cloned.
277 ///
278 /// # Arguments
279 ///
280 /// * `timeout` - Maximum time to wait for messages, in milliseconds.
281 ///
282 /// Returns an empty vector if no messages are available before the timeout expires.
283 /// This function does not block the calling thread.
284 async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>;
285
286 /// Seeks the fetch position of the specified [`MessageQueue`] to the given offset.
287 ///
288 /// The next [`poll`] invocation will return messages starting from `offset`.
289 /// This function does not block the calling thread.
290 ///
291 /// # Arguments
292 ///
293 /// * `message_queue` - The queue whose fetch position is to be updated.
294 /// * `offset` - The target offset. Must be within the queue's valid range.
295 ///
296 /// # Errors
297 ///
298 /// Returns an error if the queue is not currently assigned to this consumer,
299 /// or if the specified offset is out of the valid range.
300 ///
301 /// [`poll`]: LitePullConsumerLocal::poll
302 async fn seek(&self, message_queue: &MessageQueue, offset: i64) -> rocketmq_error::RocketMQResult<()>;
303
304 /// Suspends message fetching for the specified [`MessageQueue`]s.
305 ///
306 /// Paused queues are excluded from subsequent [`poll`] results until [`resume`] is called.
307 /// This function does not block the calling thread.
308 ///
309 /// # Arguments
310 ///
311 /// * `message_queues` - The queues to pause.
312 ///
313 /// [`poll`]: LitePullConsumerLocal::poll
314 /// [`resume`]: LitePullConsumerLocal::resume
315 async fn pause(&self, message_queues: Vec<MessageQueue>);
316
317 /// Resumes message fetching for the specified [`MessageQueue`]s.
318 ///
319 /// This function does not block the calling thread.
320 ///
321 /// # Arguments
322 ///
323 /// * `message_queues` - The queues to resume.
324 async fn resume(&self, message_queues: Vec<MessageQueue>);
325
326 /// Returns whether automatic offset commit is enabled.
327 ///
328 /// This function does not block the calling thread.
329 async fn is_auto_commit(&self) -> bool;
330
331 /// Enables or disables automatic offset commit.
332 ///
333 /// When auto-commit is enabled, offsets are committed periodically without explicit calls
334 /// to [`commit`] or [`commit_sync`]. When disabled, the caller is responsible for committing
335 /// offsets.
336 ///
337 /// This function does not block the calling thread.
338 ///
339 /// # Arguments
340 ///
341 /// * `auto_commit` - `true` to enable automatic offset commit; `false` to disable it.
342 ///
343 /// [`commit`]: LitePullConsumerLocal::commit
344 /// [`commit_sync`]: LitePullConsumerLocal::commit_sync
345 async fn set_auto_commit(&self, auto_commit: bool);
346
347 /// Queries the broker for all [`MessageQueue`]s belonging to the specified topic.
348 ///
349 /// This function does not block the calling thread.
350 ///
351 /// # Arguments
352 ///
353 /// * `topic` - The name of the topic to query.
354 ///
355 /// # Errors
356 ///
357 /// Returns an error if the topic does not exist, if the name server is unreachable,
358 /// or if the consumer is not in the running state.
359 async fn fetch_message_queues(&self, topic: &str) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>>;
360
361 /// Queries the broker for the offset corresponding to the given timestamp in a queue.
362 ///
363 /// This function does not block the calling thread.
364 ///
365 /// # Arguments
366 ///
367 /// * `message_queue` - The queue to query.
368 /// * `timestamp` - The Unix timestamp in milliseconds. The broker returns the offset of the
369 /// first message stored at or after this timestamp.
370 ///
371 /// # Errors
372 ///
373 /// Returns an error if the queue is not found on the broker or the query fails.
374 async fn offset_for_timestamp(
375 &self,
376 message_queue: &MessageQueue,
377 timestamp: u64,
378 ) -> rocketmq_error::RocketMQResult<i64>;
379
380 /// Commits all consumed offsets and waits for the broker to acknowledge the operation.
381 ///
382 /// This function does not block the calling thread.
383 ///
384 /// # Deprecation
385 ///
386 /// This method is deprecated. The name implies synchronous behavior, but the underlying
387 /// implementation relies on a background thread to commit offsets rather than committing
388 /// synchronously. Use [`commit`] instead.
389 ///
390 /// [`commit`]: LitePullConsumerLocal::commit
391 async fn commit_sync(&self);
392
393 /// Commits the provided offsets and optionally persists them to the broker.
394 ///
395 /// This function does not block the calling thread.
396 ///
397 /// # Deprecation
398 ///
399 /// This method is deprecated. The name implies synchronous behavior, but the underlying
400 /// implementation relies on a background thread to commit offsets rather than committing
401 /// synchronously. Use [`commit_with_map`] instead.
402 ///
403 /// # Arguments
404 ///
405 /// * `offset_map` - A map from [`MessageQueue`] to the offset to commit.
406 /// * `persist` - When `true`, the committed offsets are persisted to the broker immediately.
407 ///
408 /// [`commit_with_map`]: LitePullConsumerLocal::commit_with_map
409 async fn commit_sync_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
410
411 /// Commits all consumed offsets asynchronously.
412 ///
413 /// This function does not block the calling thread. The commit is performed in the
414 /// background; use [`commit_sync`] if acknowledgment is required before proceeding.
415 ///
416 /// [`commit_sync`]: LitePullConsumerLocal::commit_sync
417 async fn commit(&self);
418
419 /// Commits the provided offsets asynchronously, optionally persisting them to the broker.
420 ///
421 /// This function does not block the calling thread.
422 ///
423 /// # Arguments
424 ///
425 /// * `offset_map` - A map from [`MessageQueue`] to the offset to commit.
426 /// * `persist` - When `true`, the committed offsets are persisted to the broker.
427 async fn commit_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
428
429 /// Commits the offsets for the specified subset of assigned queues.
430 ///
431 /// This function does not block the calling thread.
432 ///
433 /// # Arguments
434 ///
435 /// * `message_queues` - The queues whose current offsets are to be committed.
436 /// * `persist` - When `true`, the committed offsets are persisted to the broker.
437 async fn commit_with_set(&self, message_queues: HashSet<MessageQueue>, persist: bool);
438
439 /// Returns the last committed offset for the specified [`MessageQueue`].
440 ///
441 /// This function does not block the calling thread.
442 ///
443 /// # Arguments
444 ///
445 /// * `message_queue` - The queue to query.
446 ///
447 /// # Errors
448 ///
449 /// Returns an error if the queue is not assigned to this consumer or if the offset
450 /// cannot be retrieved from the offset store.
451 async fn committed(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<i64>;
452
453 /// Registers a listener that is notified when the set of [`MessageQueue`]s for a topic changes.
454 ///
455 /// This function does not block the calling thread.
456 ///
457 /// # Arguments
458 ///
459 /// * `topic` - The topic to monitor for queue changes.
460 /// * `listener` - A [`TopicMessageQueueChangeListener`] invoked when the queue set changes.
461 ///
462 /// # Errors
463 ///
464 /// Returns an error if a listener is already registered for the given topic, or if the
465 /// registration fails due to an internal error.
466 async fn register_topic_message_queue_change_listener<TL>(
467 &self,
468 topic: &str,
469 listener: TL,
470 ) -> rocketmq_error::RocketMQResult<()>
471 where
472 TL: TopicMessageQueueChangeListener + 'static;
473
474 /// Updates the name server address used for topic route discovery.
475 ///
476 /// This function does not block the calling thread.
477 ///
478 /// # Arguments
479 ///
480 /// * `name_server_address` - The new semicolon-separated name server address list.
481 async fn update_name_server_address(&self, name_server_address: &str);
482
483 /// Seeks the fetch position of the specified [`MessageQueue`] to its earliest available offset.
484 ///
485 /// This function does not block the calling thread.
486 ///
487 /// # Arguments
488 ///
489 /// * `message_queue` - The queue to seek to the beginning.
490 ///
491 /// # Errors
492 ///
493 /// Returns an error if the queue is not assigned to this consumer or if the earliest
494 /// offset cannot be retrieved from the broker.
495 async fn seek_to_begin(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<()>;
496
497 /// Seeks the fetch position of the specified [`MessageQueue`] to its latest available offset.
498 ///
499 /// The next [`poll`] call will return only messages published after this point.
500 /// This function does not block the calling thread.
501 ///
502 /// # Arguments
503 ///
504 /// * `message_queue` - The queue to seek to the end.
505 ///
506 /// # Errors
507 ///
508 /// Returns an error if the queue is not assigned to this consumer or if the latest
509 /// offset cannot be retrieved from the broker.
510 ///
511 /// [`poll`]: LitePullConsumerLocal::poll
512 async fn seek_to_end(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<()>;
513
514 /// Commits all consumed offsets for all assigned queues.
515 ///
516 /// This method commits the current consumption offset for every assigned [`MessageQueue`].
517 /// Unlike [`commit`], which commits offsets asynchronously in the background, this method
518 /// ensures all offsets are persisted to the broker.
519 ///
520 /// This function does not block the calling thread.
521 ///
522 /// # Errors
523 ///
524 /// Returns an error if the consumer is not in the running state or if the offset
525 /// persistence fails.
526 ///
527 /// [`commit`]: LitePullConsumerLocal::commit
528 async fn commit_all(&self) -> rocketmq_error::RocketMQResult<()>;
529
530 /// Checks whether a specific [`MessageQueue`] is currently paused.
531 ///
532 /// A paused queue will not be fetched from during [`poll`] operations until it is resumed.
533 ///
534 /// This function does not block the calling thread.
535 ///
536 /// # Arguments
537 ///
538 /// * `message_queue` - The queue to check.
539 ///
540 /// # Returns
541 ///
542 /// `true` if the queue is paused, `false` otherwise.
543 ///
544 /// [`poll`]: LitePullConsumerLocal::poll
545 async fn is_paused(&self, message_queue: &MessageQueue) -> bool;
546}