danube_client/
consumer.rs

1use crate::{
2    errors::{DanubeError, Result},
3    retry_manager::RetryManager,
4    topic_consumer::TopicConsumer,
5    DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{
12    atomic::{AtomicBool, Ordering},
13    Arc,
14};
15use tokio::sync::{mpsc, Mutex};
16use tokio::task::JoinHandle;
17
18/// Represents the type of subscription
19///
20/// Variants:
21/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
22/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
23/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
24///             multiple can subscribe but waits in standby, if the active consumer disconnect        
25#[derive(Debug, Clone)]
26pub enum SubType {
27    Exclusive,
28    Shared,
29    FailOver,
30}
31
32/// Consumer represents a message consumer that subscribes to a topic and receives messages.
33/// It handles communication with the message broker and manages the consumer's state.
34#[derive(Debug)]
35pub struct Consumer {
36    // the Danube client
37    client: DanubeClient,
38    // the topic name, from where the messages are consumed
39    topic_name: String,
40    // the name of the Consumer
41    consumer_name: String,
42    // the map between the partitioned topic name and the consumer instance
43    consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
44    // the name of the subscription the consumer is attached to
45    subscription: String,
46    // the type of the subscription, that can be Shared and Exclusive
47    subscription_type: SubType,
48    // other configurable options for the consumer
49    consumer_options: ConsumerOptions,
50    // shutdown flag and task handles for graceful close
51    shutdown: Arc<AtomicBool>,
52    task_handles: Vec<JoinHandle<()>>,
53}
54
55impl Consumer {
56    pub(crate) fn new(
57        client: DanubeClient,
58        topic_name: String,
59        consumer_name: String,
60        subscription: String,
61        sub_type: Option<SubType>,
62        consumer_options: ConsumerOptions,
63    ) -> Self {
64        let subscription_type = if let Some(sub_type) = sub_type {
65            sub_type
66        } else {
67            SubType::Shared
68        };
69
70        Consumer {
71            client,
72            topic_name,
73            consumer_name,
74            consumers: HashMap::new(),
75            subscription,
76            subscription_type,
77            consumer_options,
78            shutdown: Arc::new(AtomicBool::new(false)),
79            task_handles: Vec::new(),
80        }
81    }
82
83    /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
84    ///
85    /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
86    ///
87    /// # Errors
88    /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
89    pub async fn subscribe(&mut self) -> Result<()> {
90        // Get partitions from the topic
91        let partitions = self
92            .client
93            .lookup_service
94            .topic_partitions(&self.client.uri, &self.topic_name)
95            .await?;
96
97        // Create TopicConsumer for each partition
98        let mut tasks = Vec::new();
99        for topic_partition in partitions {
100            let topic_name = topic_partition.clone();
101            let consumer_name = self.consumer_name.clone();
102            let subscription = self.subscription.clone();
103            let subscription_type = self.subscription_type.clone();
104            let consumer_options = self.consumer_options.clone();
105            let client = self.client.clone();
106
107            let task = tokio::spawn(async move {
108                let mut topic_consumer = TopicConsumer::new(
109                    client,
110                    topic_name,
111                    consumer_name,
112                    subscription,
113                    Some(subscription_type),
114                    consumer_options,
115                );
116                match topic_consumer.subscribe().await {
117                    Ok(_) => Ok(topic_consumer),
118                    Err(e) => Err(e),
119                }
120            });
121
122            tasks.push(task);
123        }
124
125        // Wait for all tasks to complete
126        let results = join_all(tasks).await;
127
128        // Collect results
129        let mut topic_consumers = HashMap::new();
130        for result in results {
131            match result {
132                Ok(Ok(consumer)) => {
133                    topic_consumers.insert(
134                        consumer.get_topic_name().to_string(),
135                        Arc::new(Mutex::new(consumer)),
136                    );
137                }
138                Ok(Err(e)) => return Err(e),
139                Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
140            }
141        }
142
143        if topic_consumers.is_empty() {
144            return Err(DanubeError::Unrecoverable(
145                "No partitions found".to_string(),
146            ));
147        }
148
149        self.consumers.extend(topic_consumers.into_iter());
150        Ok(())
151    }
152
153    /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
154    ///
155    /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
156    ///
157    /// # Returns
158    ///
159    /// A `Result` with:
160    /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
161    /// - `Err(e)` if the receive client cannot be created or if other issues occur.
162    pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
163        // Create a channel to send messages to the client
164        let (tx, rx) = mpsc::channel(100); // Buffer size of 100, adjust as needed
165
166        // Create retry manager for consumers
167        let retry_manager = RetryManager::new(
168            self.consumer_options.max_retries,
169            self.consumer_options.base_backoff_ms,
170            self.consumer_options.max_backoff_ms,
171        );
172
173        // Spawn a task for each cloned TopicConsumer
174        for (_, consumer) in &self.consumers {
175            let tx = tx.clone();
176            let consumer = Arc::clone(consumer);
177            let retry_manager = retry_manager.clone();
178            let shutdown = self.shutdown.clone();
179
180            let handle: JoinHandle<()> = tokio::spawn(async move {
181                let mut attempts = 0;
182                let max_retries = if retry_manager.max_retries() == 0 {
183                    5
184                } else {
185                    retry_manager.max_retries()
186                };
187
188                loop {
189                    if shutdown.load(Ordering::SeqCst) {
190                        return;
191                    }
192                    // Try to get stream from consumer (subscribe is handled internally with its own retry)
193                    let stream_result = {
194                        let mut locked = consumer.lock().await;
195                        locked.receive().await
196                    };
197
198                    match stream_result {
199                        Ok(mut stream) => {
200                            attempts = 0; // Reset attempts on successful connection
201
202                            // Process messages until stream ends or errors
203                            while !shutdown.load(Ordering::SeqCst) {
204                                let message_opt = stream.next().await;
205                                if message_opt.is_none() {
206                                    break;
207                                }
208                                let message = message_opt.unwrap();
209                                match message {
210                                    Ok(stream_message) => {
211                                        let message: StreamMessage = stream_message.into();
212                                        if tx.send(message).await.is_err() {
213                                            // Channel is closed, exit the task
214                                            return;
215                                        }
216                                    }
217                                    Err(e) => {
218                                        eprintln!("Error receiving message: {}", e);
219                                        break; // Stream error, will retry receive
220                                    }
221                                }
222                            }
223                            // Stream ended, retry receive
224                        }
225                        Err(error) => {
226                            if shutdown.load(Ordering::SeqCst) {
227                                return;
228                            }
229                            // Check if this is an unrecoverable error (e.g., stream client not initialized)
230                            if matches!(error, DanubeError::Unrecoverable(_)) {
231                                eprintln!(
232                                    "Unrecoverable error detected, attempting resubscription: {:?}",
233                                    error
234                                );
235
236                                // Attempt to resubscribe for unrecoverable errors
237                                let resubscribe_result = {
238                                    let mut locked = consumer.lock().await;
239                                    locked.subscribe().await
240                                };
241
242                                match resubscribe_result {
243                                    Ok(_) => {
244                                        eprintln!("Resubscription successful after unrecoverable error, continuing...");
245                                        attempts = 0; // Reset attempts after successful resubscription
246                                        continue; // Go back to creating stream_result
247                                    }
248                                    Err(e) => {
249                                        eprintln!(
250                                            "Resubscription failed after unrecoverable error: {:?}",
251                                            e
252                                        );
253                                        return; // Exit task if resubscription fails
254                                    }
255                                }
256                            }
257
258                            // Failed to get stream, check if retryable
259                            if retry_manager.is_retryable_error(&error) {
260                                attempts += 1;
261                                if attempts > max_retries {
262                                    eprintln!("Max retries exceeded for consumer receive, attempting resubscription");
263
264                                    // Attempt to resubscribe
265                                    let resubscribe_result = {
266                                        let mut locked = consumer.lock().await;
267                                        locked.subscribe().await
268                                    };
269
270                                    match resubscribe_result {
271                                        Ok(_) => {
272                                            eprintln!("Resubscription successful, continuing...");
273                                            break; // Break out of retry loop and go back to creating stream_result
274                                        }
275                                        Err(e) => {
276                                            eprintln!("Resubscription failed: {:?}", e);
277                                            return; // Exit task if resubscription fails
278                                        }
279                                    }
280                                }
281                                let backoff = retry_manager.calculate_backoff(attempts - 1);
282                                tokio::time::sleep(backoff).await;
283                            } else {
284                                eprintln!("Non-retryable error in consumer receive: {:?}", error);
285                                return; // Non-retryable error
286                            }
287                        }
288                    }
289                }
290            });
291            self.task_handles.push(handle);
292        }
293
294        Ok(rx)
295    }
296
297    pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
298        let topic_name = message.msg_id.topic_name.clone();
299        let topic_consumer = self.consumers.get_mut(&topic_name);
300        if let Some(topic_consumer) = topic_consumer {
301            let mut topic_consumer = topic_consumer.lock().await;
302            let _ = topic_consumer
303                .send_ack(
304                    message.request_id,
305                    message.msg_id.clone(),
306                    &self.subscription,
307                )
308                .await?;
309        }
310        Ok(())
311    }
312
313    /// Gracefully close all receive tasks and stop background activities for this consumer
314    pub async fn close(&mut self) {
315        // signal shutdown
316        self.shutdown.store(true, Ordering::SeqCst);
317        // stop topic-level activities (e.g., health checks)
318        for (_, topic_consumer) in self.consumers.iter() {
319            let locked = topic_consumer.lock().await;
320            locked.stop();
321        }
322        // abort receive tasks
323        for handle in self.task_handles.drain(..) {
324            handle.abort();
325        }
326        // small delay to allow server to observe closure
327        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
328    }
329}
330
331/// ConsumerBuilder is a builder for creating a new Consumer instance.
332///
333/// It allows setting various properties for the consumer such as topic, name, subscription,
334/// subscription type, and options.
335#[derive(Debug, Clone)]
336pub struct ConsumerBuilder {
337    client: DanubeClient,
338    topic: Option<String>,
339    consumer_name: Option<String>,
340    subscription: Option<String>,
341    subscription_type: Option<SubType>,
342    consumer_options: ConsumerOptions,
343}
344
345impl ConsumerBuilder {
346    pub fn new(client: &DanubeClient) -> Self {
347        ConsumerBuilder {
348            client: client.clone(),
349            topic: None,
350            consumer_name: None,
351            subscription: None,
352            subscription_type: None,
353            consumer_options: ConsumerOptions::default(),
354        }
355    }
356
357    /// Sets the topic name for the consumer.
358    ///
359    /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
360    ///
361    /// # Parameters
362    ///
363    /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
364    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
365        self.topic = Some(topic.into());
366        self
367    }
368
369    /// Sets the name of the consumer instance.
370    ///
371    /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
372    ///
373    /// # Parameters
374    ///
375    /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
376    pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
377        self.consumer_name = Some(consumer_name.into());
378        self
379    }
380
381    /// Sets the name of the subscription for the consumer.
382    ///
383    /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
384    ///
385    /// # Parameters
386    ///
387    /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
388    pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
389        self.subscription = Some(subscription_name.into());
390        self
391    }
392
393    /// Sets the type of subscription for the consumer. This field is optional.
394    ///
395    /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
396    ///
397    /// # Parameters
398    ///
399    /// - `sub_type`: The type of subscription. This should be one of the following:
400    ///   - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
401    ///   - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
402    ///   - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
403    pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
404        self.subscription_type = Some(subscription_type);
405        self
406    }
407
408    /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
409    ///
410    /// This method performs validation to ensure that all required fields are set before creating the `Consumer`.  Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
411    ///
412    /// # Returns
413    ///
414    /// -  A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
415    pub fn build(self) -> Consumer {
416        let topic = self.topic.expect("you should specify the topic");
417        let consumer_name = self
418            .consumer_name
419            .expect("you should provide a name for the consumer");
420        let subscription = self
421            .subscription
422            .expect("you should provide the name of the subscription");
423        Consumer::new(
424            self.client,
425            topic,
426            consumer_name,
427            subscription,
428            self.subscription_type,
429            self.consumer_options,
430        )
431    }
432}
433
434/// Configuration options for consumers
435#[derive(Debug, Clone, Default)]
436pub struct ConsumerOptions {
437    // Reserved for future use
438    pub others: String,
439    // Maximum number of retry attempts
440    pub max_retries: usize,
441    // Base backoff in milliseconds for exponential backoff
442    pub base_backoff_ms: u64,
443    // Maximum backoff cap in milliseconds
444    pub max_backoff_ms: u64,
445}