rocketmq_client_rust/consumer/lite_pull_consumer.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19
20use rocketmq_common::common::message::message_ext::MessageExt;
21use rocketmq_common::common::message::message_queue::MessageQueue;
22
23use crate::consumer::message_queue_listener::MessageQueueListener;
24use crate::consumer::message_selector::MessageSelector;
25use crate::consumer::topic_message_queue_change_listener::TopicMessageQueueChangeListener;
26
27#[trait_variant::make(LitePullConsumer: Send)]
28pub trait LitePullConsumerLocal: Sync {
29 /// Starts the LitePullConsumer.
30 ///
31 /// # Returns
32 ///
33 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
34 async fn start(&self) -> rocketmq_error::RocketMQResult<()>;
35
36 /// Shuts down the LitePullConsumer.
37 async fn shutdown(&self);
38
39 /// Checks if the LitePullConsumer is running.
40 ///
41 /// # Returns
42 ///
43 /// * `bool` - `true` if the consumer is running, `false` otherwise.
44 async fn is_running(&self) -> bool;
45
46 /// Subscribes to a topic.
47 ///
48 /// # Arguments
49 ///
50 /// * `topic` - The name of the topic to subscribe to.
51 ///
52 /// # Returns
53 ///
54 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
55 async fn subscribe(&self, topic: &str) -> rocketmq_error::RocketMQResult<()>;
56
57 /// Subscribes to a topic with a subscription expression.
58 ///
59 /// # Arguments
60 ///
61 /// * `topic` - The name of the topic to subscribe to.
62 /// * `sub_expression` - The subscription expression.
63 ///
64 /// # Returns
65 ///
66 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
67 async fn subscribe_with_expression(
68 &self,
69 topic: &str,
70 sub_expression: &str,
71 ) -> rocketmq_error::RocketMQResult<()>;
72
73 /// Subscribes to a topic with a subscription expression and a message queue listener.
74 ///
75 /// # Arguments
76 ///
77 /// * `topic` - The name of the topic to subscribe to.
78 /// * `sub_expression` - The subscription expression.
79 /// * `listener` - The message queue listener.
80 ///
81 /// # Returns
82 ///
83 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
84 async fn subscribe_with_listener<MQL>(
85 &self,
86 topic: &str,
87 sub_expression: &str,
88 listener: MQL,
89 ) -> rocketmq_error::RocketMQResult<()>
90 where
91 MQL: MessageQueueListener;
92
93 /// Subscribes to a topic with a message selector.
94 ///
95 /// # Arguments
96 ///
97 /// * `topic` - The name of the topic to subscribe to.
98 /// * `selector` - The message selector.
99 ///
100 /// # Returns
101 ///
102 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
103 async fn subscribe_with_selector(
104 &self,
105 topic: &str,
106 selector: Option<MessageSelector>,
107 ) -> rocketmq_error::RocketMQResult<()>;
108 /// Unsubscribes from a topic.
109 ///
110 /// # Arguments
111 ///
112 /// * `topic` - The name of the topic to unsubscribe from.
113 async fn unsubscribe(&self, topic: &str);
114
115 /// Retrieves the current assignment of message queues.
116 ///
117 /// # Returns
118 ///
119 /// * `rocketmq_error::RocketMQResult<HashSet<MessageQueue>>` - A set of assigned message queues
120 /// or an error.
121 async fn assignment(&self) -> rocketmq_error::RocketMQResult<HashSet<MessageQueue>>;
122
123 /// Assigns a list of message queues to the consumer.
124 ///
125 /// # Arguments
126 ///
127 /// * `message_queues` - A vector of `MessageQueue` instances to assign.
128 async fn assign(&self, message_queues: Vec<MessageQueue>);
129
130 /// Sets the subscription expression for an assigned topic.
131 ///
132 /// # Arguments
133 ///
134 /// * `topic` - The name of the topic.
135 /// * `sub_expression` - The subscription expression.
136 async fn set_sub_expression_for_assign(&self, topic: &str, sub_expression: &str);
137
138 /// Polls for messages.
139 ///
140 /// # Returns
141 ///
142 /// * `Vec<MessageExt>` - A vector of polled messages.
143 async fn poll(&self) -> Vec<MessageExt>;
144
145 /// Polls for messages with a timeout.
146 ///
147 /// # Arguments
148 ///
149 /// * `timeout` - The timeout duration in milliseconds.
150 ///
151 /// # Returns
152 ///
153 /// * `Vec<MessageExt>` - A vector of polled messages.
154 async fn poll_with_timeout(&self, timeout: u64) -> Vec<MessageExt>;
155
156 /// Seeks to a specific offset in a message queue.
157 ///
158 /// # Arguments
159 ///
160 /// * `message_queue` - The message queue to seek.
161 /// * `offset` - The offset to seek to.
162 ///
163 /// # Returns
164 ///
165 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
166 async fn seek(
167 &self,
168 message_queue: &MessageQueue,
169 offset: i64,
170 ) -> rocketmq_error::RocketMQResult<()>;
171
172 /// Pauses message consumption for the specified message queues.
173 ///
174 /// # Arguments
175 ///
176 /// * `message_queues` - A vector of `MessageQueue` instances to pause.
177 async fn pause(&self, message_queues: Vec<MessageQueue>);
178
179 /// Resumes message consumption for the specified message queues.
180 ///
181 /// # Arguments
182 ///
183 /// * `message_queues` - A vector of `MessageQueue` instances to resume.
184 async fn resume(&self, message_queues: Vec<MessageQueue>);
185
186 /// Checks if auto-commit is enabled.
187 ///
188 /// # Returns
189 ///
190 /// * `bool` - `true` if auto-commit is enabled, `false` otherwise.
191 async fn is_auto_commit(&self) -> bool;
192
193 /// Sets the auto-commit mode.
194 ///
195 /// # Arguments
196 ///
197 /// * `auto_commit` - `true` to enable auto-commit, `false` to disable it.
198 async fn set_auto_commit(&self, auto_commit: bool);
199
200 /// Fetches the message queues for a topic.
201 ///
202 /// # Arguments
203 ///
204 /// * `topic` - The name of the topic.
205 ///
206 /// # Returns
207 ///
208 /// * `rocketmq_error::RocketMQResult<Vec<MessageQueue>>` - A vector of message queues or an
209 /// error.
210 async fn fetch_message_queues(
211 &self,
212 topic: &str,
213 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>>;
214 /// Retrieves the offset for a given timestamp in a message queue.
215 ///
216 /// # Arguments
217 ///
218 /// * `message_queue` - The message queue to query.
219 /// * `timestamp` - The timestamp to query the offset for.
220 ///
221 /// # Returns
222 ///
223 /// * `rocketmq_error::RocketMQResult<i64>` - The offset corresponding to the given timestamp or
224 /// an error.
225 async fn offset_for_timestamp(
226 &self,
227 message_queue: &MessageQueue,
228 timestamp: u64,
229 ) -> rocketmq_error::RocketMQResult<i64>;
230
231 /// Commits the current offsets synchronously.
232 async fn commit_sync(&self);
233
234 /// Commits the provided offsets synchronously.
235 ///
236 /// # Arguments
237 ///
238 /// * `offset_map` - A map of message queues to offsets.
239 /// * `persist` - Whether to persist the offsets.
240 async fn commit_sync_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
241
242 /// Commits the current offsets.
243 async fn commit(&self);
244
245 /// Commits the provided offsets.
246 ///
247 /// # Arguments
248 ///
249 /// * `offset_map` - A map of message queues to offsets.
250 /// * `persist` - Whether to persist the offsets.
251 async fn commit_with_map(&self, offset_map: HashMap<MessageQueue, i64>, persist: bool);
252
253 /// Commits the offsets for the provided message queues.
254 ///
255 /// # Arguments
256 ///
257 /// * `message_queues` - A set of message queues to commit offsets for.
258 /// * `persist` - Whether to persist the offsets.
259 async fn commit_with_set(&self, message_queues: HashSet<MessageQueue>, persist: bool);
260
261 /// Retrieves the committed offset for a message queue.
262 ///
263 /// # Arguments
264 ///
265 /// * `message_queue` - The message queue to query.
266 ///
267 /// # Returns
268 ///
269 /// * `rocketmq_error::RocketMQResult<i64>` - The committed offset or an error.
270 async fn committed(&self, message_queue: &MessageQueue) -> rocketmq_error::RocketMQResult<i64>;
271
272 /// Registers a listener for changes to the message queues of a topic.
273 ///
274 /// # Arguments
275 ///
276 /// * `topic` - The name of the topic.
277 /// * `listener` - The listener to register.
278 ///
279 /// # Returns
280 ///
281 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
282 async fn register_topic_message_queue_change_listener<TL>(
283 &self,
284 topic: &str,
285 listener: TL,
286 ) -> rocketmq_error::RocketMQResult<()>
287 where
288 TL: TopicMessageQueueChangeListener;
289
290 /// Updates the name server address.
291 ///
292 /// # Arguments
293 ///
294 /// * `name_server_address` - The new name server address.
295 async fn update_name_server_address(&self, name_server_address: &str);
296
297 /// Seeks to the beginning of a message queue.
298 ///
299 /// # Arguments
300 ///
301 /// * `message_queue` - The message queue to seek.
302 ///
303 /// # Returns
304 ///
305 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
306 async fn seek_to_begin(
307 &self,
308 message_queue: &MessageQueue,
309 ) -> rocketmq_error::RocketMQResult<()>;
310
311 /// Seeks to the end of a message queue.
312 ///
313 /// # Arguments
314 ///
315 /// * `message_queue` - The message queue to seek.
316 ///
317 /// # Returns
318 ///
319 /// * `rocketmq_error::RocketMQResult<()>` - An empty result indicating success or failure.
320 async fn seek_to_end(&self, message_queue: &MessageQueue)
321 -> rocketmq_error::RocketMQResult<()>;
322}