Skip to main content

danube_client/
consumer.rs

1use crate::{
2    errors::{DanubeError, Result},
3    retry_manager::RetryManager,
4    topic_consumer::TopicConsumer,
5    DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{
12    atomic::{AtomicBool, Ordering},
13    Arc,
14};
15use tokio::sync::{mpsc, Mutex};
16use tokio::task::JoinHandle;
17use tracing::{error, info, warn};
18
19/// Buffer size for the message channel between per-partition tasks and the consumer.
20const RECEIVE_CHANNEL_BUFFER: usize = 100;
21/// Small delay after signaling shutdown to allow the broker to observe closure.
22const GRACEFUL_CLOSE_DELAY_MS: u64 = 100;
23
24/// Represents the type of subscription
25///
26/// Variants:
27/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
28/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
29/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
30///             multiple can subscribe but waits in standby, if the active consumer disconnect        
31#[derive(Debug, Clone)]
32pub enum SubType {
33    Exclusive,
34    Shared,
35    FailOver,
36}
37
38/// Consumer represents a message consumer that subscribes to a topic and receives messages.
39/// It handles communication with the message broker and manages the consumer's state.
40#[derive(Debug)]
41pub struct Consumer {
42    // the Danube client
43    client: DanubeClient,
44    // the topic name, from where the messages are consumed
45    topic_name: String,
46    // the name of the Consumer
47    consumer_name: String,
48    // the map between the partitioned topic name and the consumer instance
49    consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
50    // the name of the subscription the consumer is attached to
51    subscription: String,
52    // the type of the subscription, that can be Shared and Exclusive
53    subscription_type: SubType,
54    // other configurable options for the consumer
55    consumer_options: ConsumerOptions,
56    // shutdown flag and task handles for graceful close
57    shutdown: Arc<AtomicBool>,
58    task_handles: Vec<JoinHandle<()>>,
59}
60
61impl Consumer {
62    pub(crate) fn new(
63        client: DanubeClient,
64        topic_name: String,
65        consumer_name: String,
66        subscription: String,
67        sub_type: Option<SubType>,
68        consumer_options: ConsumerOptions,
69    ) -> Self {
70        let subscription_type = sub_type.unwrap_or(SubType::Shared);
71
72        Consumer {
73            client,
74            topic_name,
75            consumer_name,
76            consumers: HashMap::new(),
77            subscription,
78            subscription_type,
79            consumer_options,
80            shutdown: Arc::new(AtomicBool::new(false)),
81            task_handles: Vec::new(),
82        }
83    }
84
85    /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
86    ///
87    /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
88    ///
89    /// # Errors
90    /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
91    pub async fn subscribe(&mut self) -> Result<()> {
92        // Get partitions from the topic
93        let partitions = self
94            .client
95            .lookup_service
96            .topic_partitions(&self.client.uri, &self.topic_name)
97            .await?;
98
99        // Create TopicConsumer for each partition
100        let mut tasks = Vec::new();
101        for topic_partition in partitions {
102            let topic_name = topic_partition.clone();
103            let consumer_name = self.consumer_name.clone();
104            let subscription = self.subscription.clone();
105            let subscription_type = self.subscription_type.clone();
106            let consumer_options = self.consumer_options.clone();
107            let client = self.client.clone();
108
109            let task = tokio::spawn(async move {
110                let mut topic_consumer = TopicConsumer::new(
111                    client,
112                    topic_name,
113                    consumer_name,
114                    subscription,
115                    Some(subscription_type),
116                    consumer_options,
117                );
118                match topic_consumer.subscribe().await {
119                    Ok(_) => Ok(topic_consumer),
120                    Err(e) => Err(e),
121                }
122            });
123
124            tasks.push(task);
125        }
126
127        // Wait for all tasks to complete
128        let results = join_all(tasks).await;
129
130        // Collect results
131        let mut topic_consumers = HashMap::new();
132        for result in results {
133            match result {
134                Ok(Ok(consumer)) => {
135                    topic_consumers.insert(
136                        consumer.get_topic_name().to_string(),
137                        Arc::new(Mutex::new(consumer)),
138                    );
139                }
140                Ok(Err(e)) => return Err(e),
141                Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
142            }
143        }
144
145        if topic_consumers.is_empty() {
146            return Err(DanubeError::Unrecoverable(
147                "No partitions found".to_string(),
148            ));
149        }
150
151        self.consumers.extend(topic_consumers.into_iter());
152        Ok(())
153    }
154
155    /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
156    ///
157    /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
158    ///
159    /// # Returns
160    ///
161    /// A `Result` with:
162    /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
163    /// - `Err(e)` if the receive client cannot be created or if other issues occur.
164    pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
165        let (tx, rx) = mpsc::channel(RECEIVE_CHANNEL_BUFFER);
166
167        let retry_manager = RetryManager::new(
168            self.consumer_options.max_retries,
169            self.consumer_options.base_backoff_ms,
170            self.consumer_options.max_backoff_ms,
171        );
172
173        for (_, consumer) in &self.consumers {
174            let broker_stop = {
175                let locked = consumer.lock().await;
176                Arc::clone(&locked.stop_signal)
177            };
178            let handle = tokio::spawn(partition_receive_loop(
179                Arc::clone(consumer),
180                tx.clone(),
181                retry_manager.clone(),
182                self.shutdown.clone(),
183                broker_stop,
184            ));
185            self.task_handles.push(handle);
186        }
187
188        Ok(rx)
189    }
190
191    pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
192        let topic_name = message.msg_id.topic_name.clone();
193        let topic_consumer = self.consumers.get_mut(&topic_name);
194        if let Some(topic_consumer) = topic_consumer {
195            let mut topic_consumer = topic_consumer.lock().await;
196            let _ = topic_consumer
197                .send_ack(
198                    message.request_id,
199                    message.msg_id.clone(),
200                    &self.subscription,
201                )
202                .await?;
203        }
204        Ok(())
205    }
206
207    /// Gracefully close all receive tasks and stop background activities for this consumer
208    pub async fn close(&mut self) {
209        // signal shutdown
210        self.shutdown.store(true, Ordering::SeqCst);
211        // stop topic-level activities (e.g., health checks)
212        for (_, topic_consumer) in self.consumers.iter() {
213            let locked = topic_consumer.lock().await;
214            locked.stop();
215        }
216        // abort receive tasks
217        for handle in self.task_handles.drain(..) {
218            handle.abort();
219        }
220        // small delay to allow server to observe closure
221        tokio::time::sleep(std::time::Duration::from_millis(GRACEFUL_CLOSE_DELAY_MS)).await;
222    }
223}
224
225/// Per-partition receive loop: opens a message stream, forwards messages to `tx`,
226/// and handles retries / resubscription on errors.
227async fn partition_receive_loop(
228    consumer: Arc<Mutex<TopicConsumer>>,
229    tx: mpsc::Sender<StreamMessage>,
230    retry_manager: RetryManager,
231    shutdown: Arc<AtomicBool>,
232    broker_stop: Arc<AtomicBool>,
233) {
234    let mut attempts = 0;
235
236    loop {
237        if shutdown.load(Ordering::SeqCst) {
238            return;
239        }
240
241        let stream_result = {
242            let mut locked = consumer.lock().await;
243            locked.receive().await
244        };
245
246        match stream_result {
247            Ok(mut stream) => {
248                attempts = 0;
249
250                while !shutdown.load(Ordering::SeqCst) && !broker_stop.load(Ordering::Relaxed) {
251                    match stream.next().await {
252                        Some(Ok(stream_message)) => {
253                            let message: StreamMessage = stream_message.into();
254                            if tx.send(message).await.is_err() {
255                                return; // Channel closed
256                            }
257                        }
258                        Some(Err(e)) => {
259                            warn!(error = %e, "error receiving message");
260                            break; // Stream error, will retry
261                        }
262                        None => break, // Stream ended, will retry
263                    }
264                }
265
266                if shutdown.load(Ordering::SeqCst) {
267                    return;
268                }
269
270                // Broker signaled Close via health check — resubscribe immediately
271                if broker_stop.load(Ordering::Relaxed) {
272                    broker_stop.store(false, Ordering::Relaxed);
273                    warn!("broker signaled topic close, triggering resubscription");
274                    match resubscribe(&consumer).await {
275                        Ok(_) => {
276                            info!("resubscription successful after broker close signal");
277                            continue;
278                        }
279                        Err(e) => {
280                            error!(error = ?e, "resubscription failed after broker close signal");
281                            return;
282                        }
283                    }
284                }
285            }
286
287            // Unrecoverable: attempt resubscription
288            Err(ref error) if matches!(error, DanubeError::Unrecoverable(_)) => {
289                if shutdown.load(Ordering::SeqCst) {
290                    return;
291                }
292                warn!(error = ?error, "unrecoverable error, attempting resubscription");
293                match resubscribe(&consumer).await {
294                    Ok(_) => {
295                        info!("resubscription successful after unrecoverable error");
296                        attempts = 0;
297                        continue;
298                    }
299                    Err(e) => {
300                        error!(error = ?e, "resubscription failed after unrecoverable error");
301                        return;
302                    }
303                }
304            }
305
306            // Retryable: backoff, then escalate to resubscription after max retries
307            Err(error) if retry_manager.is_retryable_error(&error) => {
308                if shutdown.load(Ordering::SeqCst) {
309                    return;
310                }
311                attempts += 1;
312                if attempts > retry_manager.max_retries() {
313                    warn!("max retries exceeded, attempting resubscription");
314                    match resubscribe(&consumer).await {
315                        Ok(_) => {
316                            info!("resubscription successful");
317                            attempts = 0;
318                            continue;
319                        }
320                        Err(e) => {
321                            error!(error = ?e, "resubscription failed");
322                            return;
323                        }
324                    }
325                }
326                let backoff = retry_manager.calculate_backoff(attempts - 1);
327                tokio::time::sleep(backoff).await;
328            }
329
330            // Non-retryable: bail
331            Err(error) => {
332                error!(error = ?error, "non-retryable error in consumer receive");
333                return;
334            }
335        }
336    }
337}
338
339/// Resubscribe a topic consumer (e.g., after an unrecoverable error or max retries exceeded).
340async fn resubscribe(consumer: &Arc<Mutex<TopicConsumer>>) -> Result<()> {
341    let mut locked = consumer.lock().await;
342    locked.subscribe().await?;
343    Ok(())
344}
345
346/// ConsumerBuilder is a builder for creating a new Consumer instance.
347///
348/// It allows setting various properties for the consumer such as topic, name, subscription,
349/// subscription type, and options.
350#[derive(Debug, Clone)]
351pub struct ConsumerBuilder {
352    client: DanubeClient,
353    topic: Option<String>,
354    consumer_name: Option<String>,
355    subscription: Option<String>,
356    subscription_type: Option<SubType>,
357    consumer_options: ConsumerOptions,
358}
359
360impl ConsumerBuilder {
361    pub fn new(client: &DanubeClient) -> Self {
362        ConsumerBuilder {
363            client: client.clone(),
364            topic: None,
365            consumer_name: None,
366            subscription: None,
367            subscription_type: None,
368            consumer_options: ConsumerOptions::default(),
369        }
370    }
371
372    /// Sets the topic name for the consumer.
373    ///
374    /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
375    ///
376    /// # Parameters
377    ///
378    /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
379    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
380        self.topic = Some(topic.into());
381        self
382    }
383
384    /// Sets the name of the consumer instance.
385    ///
386    /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
387    ///
388    /// # Parameters
389    ///
390    /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
391    pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
392        self.consumer_name = Some(consumer_name.into());
393        self
394    }
395
396    /// Sets the name of the subscription for the consumer.
397    ///
398    /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
399    ///
400    /// # Parameters
401    ///
402    /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
403    pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
404        self.subscription = Some(subscription_name.into());
405        self
406    }
407
408    /// Sets the type of subscription for the consumer. This field is optional.
409    ///
410    /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
411    ///
412    /// # Parameters
413    ///
414    /// - `sub_type`: The type of subscription. This should be one of the following:
415    ///   - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
416    ///   - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
417    ///   - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
418    pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
419        self.subscription_type = Some(subscription_type);
420        self
421    }
422
423    /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
424    ///
425    /// This method performs validation to ensure that all required fields are set before creating the `Consumer`.  Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
426    ///
427    /// # Returns
428    ///
429    /// -  A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
430    pub fn build(self) -> Result<Consumer> {
431        let topic = self.topic.ok_or_else(|| {
432            DanubeError::Unrecoverable("topic is required to build a Consumer".into())
433        })?;
434        let consumer_name = self.consumer_name.ok_or_else(|| {
435            DanubeError::Unrecoverable("consumer name is required to build a Consumer".into())
436        })?;
437        let subscription = self.subscription.ok_or_else(|| {
438            DanubeError::Unrecoverable("subscription is required to build a Consumer".into())
439        })?;
440        Ok(Consumer::new(
441            self.client,
442            topic,
443            consumer_name,
444            subscription,
445            self.subscription_type,
446            self.consumer_options,
447        ))
448    }
449}
450
451/// Configuration options for consumers
452#[derive(Debug, Clone, Default)]
453#[non_exhaustive]
454pub struct ConsumerOptions {
455    // Maximum number of retry attempts
456    pub max_retries: usize,
457    // Base backoff in milliseconds for exponential backoff
458    pub base_backoff_ms: u64,
459    // Maximum backoff cap in milliseconds
460    pub max_backoff_ms: u64,
461}