danube_client/consumer.rs
1use crate::{
2 errors::{DanubeError, Result},
3 retry_manager::RetryManager,
4 topic_consumer::TopicConsumer,
5 DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14};
15use tokio::sync::{mpsc, Mutex};
16use tokio::task::JoinHandle;
17
18/// Represents the type of subscription
19///
20/// Variants:
21/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
22/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
23/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
24/// multiple can subscribe but waits in standby, if the active consumer disconnect
25#[derive(Debug, Clone)]
26pub enum SubType {
27 Exclusive,
28 Shared,
29 FailOver,
30}
31
32/// Consumer represents a message consumer that subscribes to a topic and receives messages.
33/// It handles communication with the message broker and manages the consumer's state.
34#[derive(Debug)]
35pub struct Consumer {
36 // the Danube client
37 client: DanubeClient,
38 // the topic name, from where the messages are consumed
39 topic_name: String,
40 // the name of the Consumer
41 consumer_name: String,
42 // the map between the partitioned topic name and the consumer instance
43 consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
44 // the name of the subscription the consumer is attached to
45 subscription: String,
46 // the type of the subscription, that can be Shared and Exclusive
47 subscription_type: SubType,
48 // other configurable options for the consumer
49 consumer_options: ConsumerOptions,
50 // shutdown flag and task handles for graceful close
51 shutdown: Arc<AtomicBool>,
52 task_handles: Vec<JoinHandle<()>>,
53}
54
55impl Consumer {
56 pub(crate) fn new(
57 client: DanubeClient,
58 topic_name: String,
59 consumer_name: String,
60 subscription: String,
61 sub_type: Option<SubType>,
62 consumer_options: ConsumerOptions,
63 ) -> Self {
64 let subscription_type = if let Some(sub_type) = sub_type {
65 sub_type
66 } else {
67 SubType::Shared
68 };
69
70 Consumer {
71 client,
72 topic_name,
73 consumer_name,
74 consumers: HashMap::new(),
75 subscription,
76 subscription_type,
77 consumer_options,
78 shutdown: Arc::new(AtomicBool::new(false)),
79 task_handles: Vec::new(),
80 }
81 }
82
83 /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
84 ///
85 /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
86 ///
87 /// # Errors
88 /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
89 pub async fn subscribe(&mut self) -> Result<()> {
90 // Get partitions from the topic
91 let partitions = self
92 .client
93 .lookup_service
94 .topic_partitions(&self.client.uri, &self.topic_name)
95 .await?;
96
97 // Create TopicConsumer for each partition
98 let mut tasks = Vec::new();
99 for topic_partition in partitions {
100 let topic_name = topic_partition.clone();
101 let consumer_name = self.consumer_name.clone();
102 let subscription = self.subscription.clone();
103 let subscription_type = self.subscription_type.clone();
104 let consumer_options = self.consumer_options.clone();
105 let client = self.client.clone();
106
107 let task = tokio::spawn(async move {
108 let mut topic_consumer = TopicConsumer::new(
109 client,
110 topic_name,
111 consumer_name,
112 subscription,
113 Some(subscription_type),
114 consumer_options,
115 );
116 match topic_consumer.subscribe().await {
117 Ok(_) => Ok(topic_consumer),
118 Err(e) => Err(e),
119 }
120 });
121
122 tasks.push(task);
123 }
124
125 // Wait for all tasks to complete
126 let results = join_all(tasks).await;
127
128 // Collect results
129 let mut topic_consumers = HashMap::new();
130 for result in results {
131 match result {
132 Ok(Ok(consumer)) => {
133 topic_consumers.insert(
134 consumer.get_topic_name().to_string(),
135 Arc::new(Mutex::new(consumer)),
136 );
137 }
138 Ok(Err(e)) => return Err(e),
139 Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
140 }
141 }
142
143 if topic_consumers.is_empty() {
144 return Err(DanubeError::Unrecoverable(
145 "No partitions found".to_string(),
146 ));
147 }
148
149 self.consumers.extend(topic_consumers.into_iter());
150 Ok(())
151 }
152
153 /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
154 ///
155 /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
156 ///
157 /// # Returns
158 ///
159 /// A `Result` with:
160 /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
161 /// - `Err(e)` if the receive client cannot be created or if other issues occur.
162 pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
163 // Create a channel to send messages to the client
164 let (tx, rx) = mpsc::channel(100); // Buffer size of 100, adjust as needed
165
166 // Create retry manager for consumers
167 let retry_manager = RetryManager::new(
168 self.consumer_options.max_retries,
169 self.consumer_options.base_backoff_ms,
170 self.consumer_options.max_backoff_ms,
171 );
172
173 // Spawn a task for each cloned TopicConsumer
174 for (_, consumer) in &self.consumers {
175 let tx = tx.clone();
176 let consumer = Arc::clone(consumer);
177 let retry_manager = retry_manager.clone();
178 let shutdown = self.shutdown.clone();
179
180 let handle: JoinHandle<()> = tokio::spawn(async move {
181 let mut attempts = 0;
182 let max_retries = if retry_manager.max_retries() == 0 {
183 5
184 } else {
185 retry_manager.max_retries()
186 };
187
188 loop {
189 if shutdown.load(Ordering::SeqCst) {
190 return;
191 }
192 // Try to get stream from consumer (subscribe is handled internally with its own retry)
193 let stream_result = {
194 let mut locked = consumer.lock().await;
195 locked.receive().await
196 };
197
198 match stream_result {
199 Ok(mut stream) => {
200 attempts = 0; // Reset attempts on successful connection
201
202 // Process messages until stream ends or errors
203 while !shutdown.load(Ordering::SeqCst) {
204 let message_opt = stream.next().await;
205 if message_opt.is_none() {
206 break;
207 }
208 let message = message_opt.unwrap();
209 match message {
210 Ok(stream_message) => {
211 let message: StreamMessage = stream_message.into();
212 if tx.send(message).await.is_err() {
213 // Channel is closed, exit the task
214 return;
215 }
216 }
217 Err(e) => {
218 eprintln!("Error receiving message: {}", e);
219 break; // Stream error, will retry receive
220 }
221 }
222 }
223 // Stream ended, retry receive
224 }
225 Err(error) => {
226 if shutdown.load(Ordering::SeqCst) {
227 return;
228 }
229 // Check if this is an unrecoverable error (e.g., stream client not initialized)
230 if matches!(error, DanubeError::Unrecoverable(_)) {
231 eprintln!(
232 "Unrecoverable error detected, attempting resubscription: {:?}",
233 error
234 );
235
236 // Attempt to resubscribe for unrecoverable errors
237 let resubscribe_result = {
238 let mut locked = consumer.lock().await;
239 locked.subscribe().await
240 };
241
242 match resubscribe_result {
243 Ok(_) => {
244 eprintln!("Resubscription successful after unrecoverable error, continuing...");
245 attempts = 0; // Reset attempts after successful resubscription
246 continue; // Go back to creating stream_result
247 }
248 Err(e) => {
249 eprintln!(
250 "Resubscription failed after unrecoverable error: {:?}",
251 e
252 );
253 return; // Exit task if resubscription fails
254 }
255 }
256 }
257
258 // Failed to get stream, check if retryable
259 if retry_manager.is_retryable_error(&error) {
260 attempts += 1;
261 if attempts > max_retries {
262 eprintln!("Max retries exceeded for consumer receive, attempting resubscription");
263
264 // Attempt to resubscribe
265 let resubscribe_result = {
266 let mut locked = consumer.lock().await;
267 locked.subscribe().await
268 };
269
270 match resubscribe_result {
271 Ok(_) => {
272 eprintln!("Resubscription successful, continuing...");
273 break; // Break out of retry loop and go back to creating stream_result
274 }
275 Err(e) => {
276 eprintln!("Resubscription failed: {:?}", e);
277 return; // Exit task if resubscription fails
278 }
279 }
280 }
281 let backoff = retry_manager.calculate_backoff(attempts - 1);
282 tokio::time::sleep(backoff).await;
283 } else {
284 eprintln!("Non-retryable error in consumer receive: {:?}", error);
285 return; // Non-retryable error
286 }
287 }
288 }
289 }
290 });
291 self.task_handles.push(handle);
292 }
293
294 Ok(rx)
295 }
296
297 pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
298 let topic_name = message.msg_id.topic_name.clone();
299 let topic_consumer = self.consumers.get_mut(&topic_name);
300 if let Some(topic_consumer) = topic_consumer {
301 let mut topic_consumer = topic_consumer.lock().await;
302 let _ = topic_consumer
303 .send_ack(
304 message.request_id,
305 message.msg_id.clone(),
306 &self.subscription,
307 )
308 .await?;
309 }
310 Ok(())
311 }
312
313 /// Gracefully close all receive tasks and stop background activities for this consumer
314 pub async fn close(&mut self) {
315 // signal shutdown
316 self.shutdown.store(true, Ordering::SeqCst);
317 // stop topic-level activities (e.g., health checks)
318 for (_, topic_consumer) in self.consumers.iter() {
319 let locked = topic_consumer.lock().await;
320 locked.stop();
321 }
322 // abort receive tasks
323 for handle in self.task_handles.drain(..) {
324 handle.abort();
325 }
326 // small delay to allow server to observe closure
327 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
328 }
329}
330
331/// ConsumerBuilder is a builder for creating a new Consumer instance.
332///
333/// It allows setting various properties for the consumer such as topic, name, subscription,
334/// subscription type, and options.
335#[derive(Debug, Clone)]
336pub struct ConsumerBuilder {
337 client: DanubeClient,
338 topic: Option<String>,
339 consumer_name: Option<String>,
340 subscription: Option<String>,
341 subscription_type: Option<SubType>,
342 consumer_options: ConsumerOptions,
343}
344
345impl ConsumerBuilder {
346 pub fn new(client: &DanubeClient) -> Self {
347 ConsumerBuilder {
348 client: client.clone(),
349 topic: None,
350 consumer_name: None,
351 subscription: None,
352 subscription_type: None,
353 consumer_options: ConsumerOptions::default(),
354 }
355 }
356
357 /// Sets the topic name for the consumer.
358 ///
359 /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
360 ///
361 /// # Parameters
362 ///
363 /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
364 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
365 self.topic = Some(topic.into());
366 self
367 }
368
369 /// Sets the name of the consumer instance.
370 ///
371 /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
372 ///
373 /// # Parameters
374 ///
375 /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
376 pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
377 self.consumer_name = Some(consumer_name.into());
378 self
379 }
380
381 /// Sets the name of the subscription for the consumer.
382 ///
383 /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
384 ///
385 /// # Parameters
386 ///
387 /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
388 pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
389 self.subscription = Some(subscription_name.into());
390 self
391 }
392
393 /// Sets the type of subscription for the consumer. This field is optional.
394 ///
395 /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
396 ///
397 /// # Parameters
398 ///
399 /// - `sub_type`: The type of subscription. This should be one of the following:
400 /// - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
401 /// - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
402 /// - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
403 pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
404 self.subscription_type = Some(subscription_type);
405 self
406 }
407
408 /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
409 ///
410 /// This method performs validation to ensure that all required fields are set before creating the `Consumer`. Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
411 ///
412 /// # Returns
413 ///
414 /// - A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
415 pub fn build(self) -> Consumer {
416 let topic = self.topic.expect("you should specify the topic");
417 let consumer_name = self
418 .consumer_name
419 .expect("you should provide a name for the consumer");
420 let subscription = self
421 .subscription
422 .expect("you should provide the name of the subscription");
423 Consumer::new(
424 self.client,
425 topic,
426 consumer_name,
427 subscription,
428 self.subscription_type,
429 self.consumer_options,
430 )
431 }
432}
433
434/// Configuration options for consumers
435#[derive(Debug, Clone, Default)]
436pub struct ConsumerOptions {
437 // Reserved for future use
438 pub others: String,
439 // Maximum number of retry attempts
440 pub max_retries: usize,
441 // Base backoff in milliseconds for exponential backoff
442 pub base_backoff_ms: u64,
443 // Maximum backoff cap in milliseconds
444 pub max_backoff_ms: u64,
445}