rocketmq_client_rust/consumer/
lite_pull_consumer.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19
20use rocketmq_common::common::message::message_ext::MessageExt;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22
23use crate::consumer::message_queue_listener::MessageQueueListener;
24use crate::consumer::message_selector::MessageSelector;
25use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
26
27#[trait_variant::make(LitePullConsumer: Send)]
28pub trait LitePullConsumerLocal: Sync {
29    /// Starts the LitePullConsumer.
30    ///
31    /// # Returns
32    ///
33    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
34    async fn start(&self) -> rocketmq_error::RocketMQResult<()>;
35
36    /// Shuts down the LitePullConsumer.
37    async fn shutdown(&self);
38
39    /// Checks if the LitePullConsumer is running.
40    ///
41    /// # Returns
42    ///
43    /// * `bool` - `true` if the consumer is running, `false` otherwise.
44    async fn is_running(&self) -> bool;
45
46    /// Subscribes to a topic.
47    ///
48    /// # Arguments
49    ///
50    /// * `topic` - The name of the topic to subscribe to.
51    ///
52    /// # Returns
53    ///
54    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
55    async fn subscribe(&self, topic: &str) -> rocketmq_error::RocketMQResult<()>;
56
57    /// Subscribes to a topic with a subscription expression.
58    ///
59    /// # Arguments
60    ///
61    /// * `topic` - The name of the topic to subscribe to.
62    /// * `sub_expression` - The subscription expression.
63    ///
64    /// # Returns
65    ///
66    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
67    async fn subscribe_with_expression(
68        &self,
69        topic: &str,
70        sub_expression: &str,
71    ) -> rocketmq_error::RocketMQResult<()>;
72
73    /// Subscribes to a topic with a subscription expression and a message queue listener.
74    ///
75    /// # Arguments
76    ///
77    /// * `topic` - The name of the topic to subscribe to.
78    /// * `sub_expression` - The subscription expression.
79    /// * `listener` - The message queue listener.
80    ///
81    /// # Returns
82    ///
83    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
84    async fn subscribe_with_listener<MQL>(
85        &self,
86        topic: &str,
87        sub_expression: &str,
88        listener: MQL,
89    ) -> rocketmq_error::RocketMQResult<()>
90    where
91        MQL: MessageQueueListener;
92
93    /// Subscribes to a topic with a message selector.
94    ///
95    /// # Arguments
96    ///
97    /// * `topic` - The name of the topic to subscribe to.
98    /// * `selector` - The message selector.
99    ///
100    /// # Returns
101    ///
102    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
103    async fn subscribe_with_selector(
104        &self,
105        topic: &str,
106        selector: Option<MessageSelector>,
107    ) -> rocketmq_error::RocketMQResult<()>;
108    /// Unsubscribes from a topic.
109    ///
110    /// # Arguments
111    ///
112    /// * `topic` - The name of the topic to unsubscribe from.
113    async fn unsubscribe(&self, topic: &str);
114
115    /// Retrieves the current assignment of message queues.
116    ///
117    /// # Returns
118    ///
119    /// * `rocketmq_error::RocketMQResult<HashSet<MessageQueue>>` - A set of assigned message queues
120    ///   or an error.
121    async fn assignment(&self) -> rocketmq_error::RocketMQResult<HashSet<MessageQueue>>;
122
123    /// Assigns a list of message queues to the consumer.
124    ///
125    /// # Arguments
126    ///
127    /// * `message_queues` - A vector of `MessageQueue` instances to assign.
128    async fn assign(&self, message_queues: Vec<MessageQueue>);
129
130    /// Sets the subscription expression for an assigned topic.
131    ///
132    /// # Arguments
133    ///
134    /// * `topic` - The name of the topic.
135    /// * `sub_expression` - The subscription expression.
136    async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str);
137
138    /// Polls for messages.
139    ///
140    /// # Returns
141    ///
142    /// * `Vec<MessageExt>` - A vector of polled messages.
143    async fn poll(&self) -> Vec<MessageExt>;
144
145    /// Polls for messages with a timeout.
146    ///
147    /// # Arguments
148    ///
149    /// * `timeout` - The timeout duration in milliseconds.
150    ///
151    /// # Returns
152    ///
153    /// * `Vec<MessageExt>` - A vector of polled messages.
154    async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>;
155
156    /// Seeks to a specific offset in a message queue.
157    ///
158    /// # Arguments
159    ///
160    /// * `message_queue` - The message queue to seek.
161    /// * `offset` - The offset to seek to.
162    ///
163    /// # Returns
164    ///
165    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
166    async fn seek(
167        &self,
168        message_queue: &MessageQueue,
169        offset: i64,
170    ) -> rocketmq_error::RocketMQResult<()>;
171
172    /// Pauses message consumption for the specified message queues.
173    ///
174    /// # Arguments
175    ///
176    /// * `message_queues` - A vector of `MessageQueue` instances to pause.
177    async fn pause(&self, message_queues: Vec<MessageQueue>);
178
179    /// Resumes message consumption for the specified message queues.
180    ///
181    /// # Arguments
182    ///
183    /// * `message_queues` - A vector of `MessageQueue` instances to resume.
184    async fn resume(&self, message_queues: Vec<MessageQueue>);
185
186    /// Checks if auto-commit is enabled.
187    ///
188    /// # Returns
189    ///
190    /// * `bool` - `true` if auto-commit is enabled, `false` otherwise.
191    async fn is_auto_commit(&self) -> bool;
192
193    /// Sets the auto-commit mode.
194    ///
195    /// # Arguments
196    ///
197    /// * `auto_commit` - `true` to enable auto-commit, `false` to disable it.
198    async fn set_auto_commit(&self, auto_commit: bool);
199
200    /// Fetches the message queues for a topic.
201    ///
202    /// # Arguments
203    ///
204    /// * `topic` - The name of the topic.
205    ///
206    /// # Returns
207    ///
208    /// * `rocketmq_error::RocketMQResult<Vec<MessageQueue>>` - A vector of message queues or an
209    ///   error.
210    async fn fetch_message_queues(
211        &self,
212        topic: &str,
213    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>>;
214    /// Retrieves the offset for a given timestamp in a message queue.
215    ///
216    /// # Arguments
217    ///
218    /// * `message_queue` - The message queue to query.
219    /// * `timestamp` - The timestamp to query the offset for.
220    ///
221    /// # Returns
222    ///
223    /// * `rocketmq_error::RocketMQResult<i64>` - The offset corresponding to the given timestamp or
224    ///   an error.
225    async fn offset_for_timestamp(
226        &self,
227        message_queue: &MessageQueue,
228        timestamp: u64,
229    ) -> rocketmq_error::RocketMQResult<i64>;
230
231    /// Commits the current offsets synchronously.
232    async fn commit_sync(&self);
233
234    /// Commits the provided offsets synchronously.
235    ///
236    /// # Arguments
237    ///
238    /// * `offset_map` - A map of message queues to offsets.
239    /// * `persist` - Whether to persist the offsets.
240    async fn commit_sync_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
241
242    /// Commits the current offsets.
243    async fn commit(&self);
244
245    /// Commits the provided offsets.
246    ///
247    /// # Arguments
248    ///
249    /// * `offset_map` - A map of message queues to offsets.
250    /// * `persist` - Whether to persist the offsets.
251    async fn commit_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
252
253    /// Commits the offsets for the provided message queues.
254    ///
255    /// # Arguments
256    ///
257    /// * `message_queues` - A set of message queues to commit offsets for.
258    /// * `persist` - Whether to persist the offsets.
259    async fn commit_with_set(&self, message_queues: HashSet<MessageQueue>, persist: bool);
260
261    /// Retrieves the committed offset for a message queue.
262    ///
263    /// # Arguments
264    ///
265    /// * `message_queue` - The message queue to query.
266    ///
267    /// # Returns
268    ///
269    /// * `rocketmq_error::RocketMQResult<i64>` - The committed offset or an error.
270    async fn committed(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<i64>;
271
272    /// Registers a listener for changes to the message queues of a topic.
273    ///
274    /// # Arguments
275    ///
276    /// * `topic` - The name of the topic.
277    /// * `listener` - The listener to register.
278    ///
279    /// # Returns
280    ///
281    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
282    async fn register_topic_message_queue_change_listener<TL>(
283        &self,
284        topic: &str,
285        listener: TL,
286    ) -> rocketmq_error::RocketMQResult<()>
287    where
288        TL: TopicMessageQueueChangeListener;
289
290    /// Updates the name server address.
291    ///
292    /// # Arguments
293    ///
294    /// * `name_server_address` - The new name server address.
295    async fn update_name_server_address(&self, name_server_address: &str);
296
297    /// Seeks to the beginning of a message queue.
298    ///
299    /// # Arguments
300    ///
301    /// * `message_queue` - The message queue to seek.
302    ///
303    /// # Returns
304    ///
305    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
306    async fn seek_to_begin(
307        &self,
308        message_queue: &MessageQueue,
309    ) -> rocketmq_error::RocketMQResult<()>;
310
311    /// Seeks to the end of a message queue.
312    ///
313    /// # Arguments
314    ///
315    /// * `message_queue` - The message queue to seek.
316    ///
317    /// # Returns
318    ///
319    /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
320    async fn seek_to_end(&self, message_queue: &MessageQueue)
321        -> rocketmq_error::RocketMQResult<()>;
322}