danube_client/
consumer.rs

1use crate::{
2    errors::{DanubeError, Result},
3    retry_manager::RetryManager,
4    topic_consumer::TopicConsumer,
5    DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
12use tokio::sync::{mpsc, Mutex};
13use tokio::task::JoinHandle;
14
15/// Represents the type of subscription
16///
17/// Variants:
18/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
19/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
20/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
21///             multiple can subscribe but waits in standby, if the active consumer disconnect        
22#[derive(Debug, Clone)]
23pub enum SubType {
24    Exclusive,
25    Shared,
26    FailOver,
27}
28
29/// Consumer represents a message consumer that subscribes to a topic and receives messages.
30/// It handles communication with the message broker and manages the consumer's state.
31#[derive(Debug)]
32pub struct Consumer {
33    // the Danube client
34    client: DanubeClient,
35    // the topic name, from where the messages are consumed
36    topic_name: String,
37    // the name of the Consumer
38    consumer_name: String,
39    // the map between the partitioned topic name and the consumer instance
40    consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
41    // the name of the subscription the consumer is attached to
42    subscription: String,
43    // the type of the subscription, that can be Shared and Exclusive
44    subscription_type: SubType,
45    // other configurable options for the consumer
46    consumer_options: ConsumerOptions,
47    // shutdown flag and task handles for graceful close
48    shutdown: Arc<AtomicBool>,
49    task_handles: Vec<JoinHandle<()>>,
50}
51
52impl Consumer {
53    pub(crate) fn new(
54        client: DanubeClient,
55        topic_name: String,
56        consumer_name: String,
57        subscription: String,
58        sub_type: Option<SubType>,
59        consumer_options: ConsumerOptions,
60    ) -> Self {
61        let subscription_type = if let Some(sub_type) = sub_type {
62            sub_type
63        } else {
64            SubType::Shared
65        };
66
67        Consumer {
68            client,
69            topic_name,
70            consumer_name,
71            consumers: HashMap::new(),
72            subscription,
73            subscription_type,
74            consumer_options,
75            shutdown: Arc::new(AtomicBool::new(false)),
76            task_handles: Vec::new(),
77        }
78    }
79
80    /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
81    ///
82    /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
83    ///
84    /// # Errors
85    /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
86    pub async fn subscribe(&mut self) -> Result<()> {
87        // Get partitions from the topic
88        let partitions = self
89            .client
90            .lookup_service
91            .topic_partitions(&self.client.uri, &self.topic_name)
92            .await?;
93
94        // Create TopicConsumer for each partition
95        let mut tasks = Vec::new();
96        for topic_partition in partitions {
97            let topic_name = topic_partition.clone();
98            let consumer_name = self.consumer_name.clone();
99            let subscription = self.subscription.clone();
100            let subscription_type = self.subscription_type.clone();
101            let consumer_options = self.consumer_options.clone();
102            let client = self.client.clone();
103
104            let task = tokio::spawn(async move {
105                let mut topic_consumer = TopicConsumer::new(
106                    client,
107                    topic_name,
108                    consumer_name,
109                    subscription,
110                    Some(subscription_type),
111                    consumer_options,
112                );
113                match topic_consumer.subscribe().await {
114                    Ok(_) => Ok(topic_consumer),
115                    Err(e) => Err(e),
116                }
117            });
118
119            tasks.push(task);
120        }
121
122        // Wait for all tasks to complete
123        let results = join_all(tasks).await;
124
125        // Collect results
126        let mut topic_consumers = HashMap::new();
127        for result in results {
128            match result {
129                Ok(Ok(consumer)) => {
130                    topic_consumers.insert(
131                        consumer.get_topic_name().to_string(),
132                        Arc::new(Mutex::new(consumer)),
133                    );
134                }
135                Ok(Err(e)) => return Err(e),
136                Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
137            }
138        }
139
140        if topic_consumers.is_empty() {
141            return Err(DanubeError::Unrecoverable(
142                "No partitions found".to_string(),
143            ));
144        }
145
146        self.consumers.extend(topic_consumers.into_iter());
147        Ok(())
148    }
149
150    /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
151    ///
152    /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
153    ///
154    /// # Returns
155    ///
156    /// A `Result` with:
157    /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
158    /// - `Err(e)` if the receive client cannot be created or if other issues occur.
159    pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
160        // Create a channel to send messages to the client
161        let (tx, rx) = mpsc::channel(100); // Buffer size of 100, adjust as needed
162
163        // Create retry manager for consumers
164        let retry_manager = RetryManager::new(
165            self.consumer_options.max_retries,
166            self.consumer_options.base_backoff_ms,
167            self.consumer_options.max_backoff_ms,
168        );
169
170        // Spawn a task for each cloned TopicConsumer
171        for (_, consumer) in &self.consumers {
172            let tx = tx.clone();
173            let consumer = Arc::clone(consumer);
174            let retry_manager = retry_manager.clone();
175            let shutdown = self.shutdown.clone();
176
177            let handle: JoinHandle<()> = tokio::spawn(async move {
178                let mut attempts = 0;
179                let max_retries = if retry_manager.max_retries() == 0 {
180                    5
181                } else {
182                    retry_manager.max_retries()
183                };
184
185                loop {
186                    if shutdown.load(Ordering::SeqCst) { return; }
187                    // Try to get stream from consumer (subscribe is handled internally with its own retry)
188                    let stream_result = {
189                        let mut locked = consumer.lock().await;
190                        locked.receive().await
191                    };
192
193                    match stream_result {
194                        Ok(mut stream) => {
195                            attempts = 0; // Reset attempts on successful connection
196
197                            // Process messages until stream ends or errors
198                            while !shutdown.load(Ordering::SeqCst) {
199                                let message_opt = stream.next().await;
200                                if message_opt.is_none() { break; }
201                                let message = message_opt.unwrap();
202                                match message {
203                                    Ok(stream_message) => {
204                                        let message: StreamMessage = stream_message.into();
205                                        if tx.send(message).await.is_err() {
206                                            // Channel is closed, exit the task
207                                            return;
208                                        }
209                                    }
210                                    Err(e) => {
211                                        eprintln!("Error receiving message: {}", e);
212                                        break; // Stream error, will retry receive
213                                    }
214                                }
215                            }
216                            // Stream ended, retry receive
217                        }
218                        Err(error) => {
219                            if shutdown.load(Ordering::SeqCst) { return; }
220                            // Check if this is an unrecoverable error (e.g., stream client not initialized)
221                            if matches!(error, DanubeError::Unrecoverable(_)) {
222                                eprintln!("Unrecoverable error detected, attempting resubscription: {:?}", error);
223                                
224                                // Attempt to resubscribe for unrecoverable errors
225                                let resubscribe_result = {
226                                    let mut locked = consumer.lock().await;
227                                    locked.subscribe().await
228                                };
229                                
230                                match resubscribe_result {
231                                    Ok(_) => {
232                                        eprintln!("Resubscription successful after unrecoverable error, continuing...");
233                                        attempts = 0; // Reset attempts after successful resubscription
234                                        continue; // Go back to creating stream_result
235                                    }
236                                    Err(e) => {
237                                        eprintln!("Resubscription failed after unrecoverable error: {:?}", e);
238                                        return; // Exit task if resubscription fails
239                                    }
240                                }
241                            }
242                            
243                            // Failed to get stream, check if retryable
244                            if retry_manager.is_retryable_error(&error) {
245                                attempts += 1;
246                                if attempts > max_retries {
247                                    eprintln!("Max retries exceeded for consumer receive, attempting resubscription");
248                                    
249                                    // Attempt to resubscribe
250                                    let resubscribe_result = {
251                                        let mut locked = consumer.lock().await;
252                                        locked.subscribe().await
253                                    };
254                                    
255                                    match resubscribe_result {
256                                        Ok(_) => {
257                                            eprintln!("Resubscription successful, continuing...");
258                                            break; // Break out of retry loop and go back to creating stream_result
259                                        }
260                                        Err(e) => {
261                                            eprintln!("Resubscription failed: {:?}", e);
262                                            return; // Exit task if resubscription fails
263                                        }
264                                    }
265                                }
266                                let backoff = retry_manager.calculate_backoff(attempts - 1);
267                                tokio::time::sleep(backoff).await;
268                            } else {
269                                eprintln!("Non-retryable error in consumer receive: {:?}", error);
270                                return; // Non-retryable error
271                            }
272                        }
273                    }
274                }
275            });
276            self.task_handles.push(handle);
277        }
278
279        Ok(rx)
280    }
281
282    pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
283        let topic_name = message.msg_id.topic_name.clone();
284        let topic_consumer = self.consumers.get_mut(&topic_name);
285        if let Some(topic_consumer) = topic_consumer {
286            let mut topic_consumer = topic_consumer.lock().await;
287            let _ = topic_consumer
288                .send_ack(
289                    message.request_id,
290                    message.msg_id.clone(),
291                    &self.subscription,
292                )
293                .await?;
294        }
295        Ok(())
296    }
297
298    /// Gracefully close all receive tasks and stop background activities for this consumer
299    pub async fn close(&mut self) {
300        // signal shutdown
301        self.shutdown.store(true, Ordering::SeqCst);
302        // stop topic-level activities (e.g., health checks)
303        for (_, topic_consumer) in self.consumers.iter() {
304            let locked = topic_consumer.lock().await;
305            locked.stop();
306        }
307        // abort receive tasks
308        for handle in self.task_handles.drain(..) {
309            handle.abort();
310        }
311        // small delay to allow server to observe closure
312        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
313    }
314}
315
316/// ConsumerBuilder is a builder for creating a new Consumer instance.
317///
318/// It allows setting various properties for the consumer such as topic, name, subscription,
319/// subscription type, and options.
320#[derive(Debug, Clone)]
321pub struct ConsumerBuilder {
322    client: DanubeClient,
323    topic: Option<String>,
324    consumer_name: Option<String>,
325    subscription: Option<String>,
326    subscription_type: Option<SubType>,
327    consumer_options: ConsumerOptions,
328}
329
330impl ConsumerBuilder {
331    pub fn new(client: &DanubeClient) -> Self {
332        ConsumerBuilder {
333            client: client.clone(),
334            topic: None,
335            consumer_name: None,
336            subscription: None,
337            subscription_type: None,
338            consumer_options: ConsumerOptions::default(),
339        }
340    }
341
342    /// Sets the topic name for the consumer.
343    ///
344    /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
345    ///
346    /// # Parameters
347    ///
348    /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
349    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
350        self.topic = Some(topic.into());
351        self
352    }
353
354    /// Sets the name of the consumer instance.
355    ///
356    /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
357    ///
358    /// # Parameters
359    ///
360    /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
361    pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
362        self.consumer_name = Some(consumer_name.into());
363        self
364    }
365
366    /// Sets the name of the subscription for the consumer.
367    ///
368    /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
369    ///
370    /// # Parameters
371    ///
372    /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
373    pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
374        self.subscription = Some(subscription_name.into());
375        self
376    }
377
378    /// Sets the type of subscription for the consumer. This field is optional.
379    ///
380    /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
381    ///
382    /// # Parameters
383    ///
384    /// - `sub_type`: The type of subscription. This should be one of the following:
385    ///   - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
386    ///   - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
387    ///   - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
388    pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
389        self.subscription_type = Some(subscription_type);
390        self
391    }
392
393    /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
394    ///
395    /// This method performs validation to ensure that all required fields are set before creating the `Consumer`.  Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
396    ///
397    /// # Returns
398    ///
399    /// -  A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
400    pub fn build(self) -> Consumer {
401        let topic = self.topic.expect("you should specify the topic");
402        let consumer_name = self
403            .consumer_name
404            .expect("you should provide a name for the consumer");
405        let subscription = self
406            .subscription
407            .expect("you should provide the name of the subscription");
408        Consumer::new(
409            self.client,
410            topic,
411            consumer_name,
412            subscription,
413            self.subscription_type,
414            self.consumer_options,
415        )
416    }
417}
418
419/// Configuration options for consumers
420#[derive(Debug, Clone, Default)]
421pub struct ConsumerOptions {
422    // Reserved for future use
423    pub others: String,
424    // Maximum number of retry attempts
425    pub max_retries: usize,
426    // Base backoff in milliseconds for exponential backoff
427    pub base_backoff_ms: u64,
428    // Maximum backoff cap in milliseconds
429    pub max_backoff_ms: u64,
430}