Skip to main content

danube_client/
consumer.rs

1use crate::{
2    errors::{DanubeError, Result},
3    retry_manager::RetryManager,
4    topic_consumer::TopicConsumer,
5    DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{
12    atomic::{AtomicBool, Ordering},
13    Arc,
14};
15use tokio::sync::{mpsc, Mutex};
16use tokio::task::JoinHandle;
17use tracing::{error, info, warn};
18
19/// Buffer size for the message channel between per-partition tasks and the consumer.
20const RECEIVE_CHANNEL_BUFFER: usize = 100;
21/// Small delay after signaling shutdown to allow the broker to observe closure.
22const GRACEFUL_CLOSE_DELAY_MS: u64 = 100;
23
24/// Represents the type of subscription
25///
26/// Variants:
27/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
28/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
29/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
30///             multiple can subscribe but waits in standby, if the active consumer disconnect        
31#[derive(Debug, Clone)]
32pub enum SubType {
33    Exclusive,
34    Shared,
35    FailOver,
36    KeyShared,
37}
38
39/// Consumer represents a message consumer that subscribes to a topic and receives messages.
40/// It handles communication with the message broker and manages the consumer's state.
41#[derive(Debug)]
42pub struct Consumer {
43    // the Danube client
44    client: DanubeClient,
45    // the topic name, from where the messages are consumed
46    topic_name: String,
47    // the name of the Consumer
48    consumer_name: String,
49    // the map between the partitioned topic name and the consumer instance
50    consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
51    // the name of the subscription the consumer is attached to
52    subscription: String,
53    // the type of the subscription, that can be Shared and Exclusive
54    subscription_type: SubType,
55    // other configurable options for the consumer
56    consumer_options: ConsumerOptions,
57    // key filter patterns for KeyShared subscriptions
58    key_filters: Vec<String>,
59    // shutdown flag and task handles for graceful close
60    shutdown: Arc<AtomicBool>,
61    task_handles: Vec<JoinHandle<()>>,
62}
63
64impl Consumer {
65    pub(crate) fn new(
66        client: DanubeClient,
67        topic_name: String,
68        consumer_name: String,
69        subscription: String,
70        sub_type: Option<SubType>,
71        consumer_options: ConsumerOptions,
72        key_filters: Vec<String>,
73    ) -> Self {
74        let subscription_type = sub_type.unwrap_or(SubType::Shared);
75
76        Consumer {
77            client,
78            topic_name,
79            consumer_name,
80            consumers: HashMap::new(),
81            subscription,
82            subscription_type,
83            consumer_options,
84            key_filters,
85            shutdown: Arc::new(AtomicBool::new(false)),
86            task_handles: Vec::new(),
87        }
88    }
89
90    /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
91    ///
92    /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
93    ///
94    /// # Errors
95    /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
96    pub async fn subscribe(&mut self) -> Result<()> {
97        // Get partitions from the topic
98        let partitions = self
99            .client
100            .lookup_service
101            .topic_partitions(&self.client.uri, &self.topic_name)
102            .await?;
103
104        // Create TopicConsumer for each partition
105        let mut tasks = Vec::new();
106        for topic_partition in partitions {
107            let topic_name = topic_partition.clone();
108            let consumer_name = self.consumer_name.clone();
109            let subscription = self.subscription.clone();
110            let subscription_type = self.subscription_type.clone();
111            let consumer_options = self.consumer_options.clone();
112            let key_filters = self.key_filters.clone();
113            let client = self.client.clone();
114
115            let task = tokio::spawn(async move {
116                let mut topic_consumer = TopicConsumer::new(
117                    client,
118                    topic_name,
119                    consumer_name,
120                    subscription,
121                    Some(subscription_type),
122                    key_filters,
123                    consumer_options,
124                );
125                match topic_consumer.subscribe().await {
126                    Ok(_) => Ok(topic_consumer),
127                    Err(e) => Err(e),
128                }
129            });
130
131            tasks.push(task);
132        }
133
134        // Wait for all tasks to complete
135        let results = join_all(tasks).await;
136
137        // Collect results
138        let mut topic_consumers = HashMap::new();
139        for result in results {
140            match result {
141                Ok(Ok(consumer)) => {
142                    topic_consumers.insert(
143                        consumer.get_topic_name().to_string(),
144                        Arc::new(Mutex::new(consumer)),
145                    );
146                }
147                Ok(Err(e)) => return Err(e),
148                Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
149            }
150        }
151
152        if topic_consumers.is_empty() {
153            return Err(DanubeError::Unrecoverable(format!(
154                "No topics found for '{}'",
155                self.topic_name
156            )));
157        }
158
159        self.consumers.extend(topic_consumers.into_iter());
160        Ok(())
161    }
162
163    /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
164    ///
165    /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
166    ///
167    /// # Returns
168    ///
169    /// A `Result` with:
170    /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
171    /// - `Err(e)` if the receive client cannot be created or if other issues occur.
172    pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
173        let (tx, rx) = mpsc::channel(RECEIVE_CHANNEL_BUFFER);
174
175        let retry_manager = RetryManager::new(
176            self.consumer_options.max_retries,
177            self.consumer_options.base_backoff_ms,
178            self.consumer_options.max_backoff_ms,
179        );
180
181        for (_, consumer) in &self.consumers {
182            let broker_stop = {
183                let locked = consumer.lock().await;
184                Arc::clone(&locked.stop_signal)
185            };
186            let handle = tokio::spawn(partition_receive_loop(
187                Arc::clone(consumer),
188                tx.clone(),
189                retry_manager.clone(),
190                self.shutdown.clone(),
191                broker_stop,
192            ));
193            self.task_handles.push(handle);
194        }
195
196        Ok(rx)
197    }
198
199    pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
200        let topic_name = message.msg_id.topic_name.clone();
201        let topic_consumer = self.consumers.get_mut(&topic_name);
202        if let Some(topic_consumer) = topic_consumer {
203            let mut topic_consumer = topic_consumer.lock().await;
204            let _ = topic_consumer
205                .send_ack(
206                    message.request_id,
207                    message.msg_id.clone(),
208                    &self.subscription,
209                )
210                .await?;
211        }
212        Ok(())
213    }
214
215    pub async fn nack(
216        &mut self,
217        message: &StreamMessage,
218        delay_ms: Option<u64>,
219        reason: Option<String>,
220    ) -> Result<()> {
221        let topic_name = message.msg_id.topic_name.clone();
222        let topic_consumer = self.consumers.get_mut(&topic_name);
223        if let Some(topic_consumer) = topic_consumer {
224            let mut topic_consumer = topic_consumer.lock().await;
225            let _ = topic_consumer
226                .send_nack(
227                    message.request_id,
228                    message.msg_id.clone(),
229                    &self.subscription,
230                    delay_ms,
231                    reason,
232                )
233                .await?;
234        }
235        Ok(())
236    }
237
238    /// Gracefully close all receive tasks and stop background activities for this consumer
239    pub async fn close(&mut self) {
240        // signal shutdown
241        self.shutdown.store(true, Ordering::SeqCst);
242        // stop topic-level activities (e.g., health checks)
243        for (_, topic_consumer) in self.consumers.iter() {
244            let locked = topic_consumer.lock().await;
245            locked.stop();
246        }
247        // abort receive tasks
248        for handle in self.task_handles.drain(..) {
249            handle.abort();
250        }
251        // small delay to allow server to observe closure
252        tokio::time::sleep(std::time::Duration::from_millis(GRACEFUL_CLOSE_DELAY_MS)).await;
253    }
254}
255
256/// Per-partition receive loop: opens a message stream, forwards messages to `tx`,
257/// and handles retries / resubscription on errors.
258async fn partition_receive_loop(
259    consumer: Arc<Mutex<TopicConsumer>>,
260    tx: mpsc::Sender<StreamMessage>,
261    retry_manager: RetryManager,
262    shutdown: Arc<AtomicBool>,
263    broker_stop: Arc<AtomicBool>,
264) {
265    let mut attempts = 0;
266
267    loop {
268        if shutdown.load(Ordering::SeqCst) {
269            return;
270        }
271
272        let stream_result = {
273            let mut locked = consumer.lock().await;
274            locked.receive().await
275        };
276
277        match stream_result {
278            Ok(mut stream) => {
279                attempts = 0;
280
281                while !shutdown.load(Ordering::SeqCst) && !broker_stop.load(Ordering::Relaxed) {
282                    match stream.next().await {
283                        Some(Ok(stream_message)) => {
284                            let message: StreamMessage = stream_message.into();
285                            if tx.send(message).await.is_err() {
286                                return; // Channel closed
287                            }
288                        }
289                        Some(Err(e)) => {
290                            warn!(error = %e, "error receiving message");
291                            break; // Stream error, will retry
292                        }
293                        None => break, // Stream ended, will retry
294                    }
295                }
296
297                if shutdown.load(Ordering::SeqCst) {
298                    return;
299                }
300
301                // Broker signaled Close via health check — resubscribe immediately
302                if broker_stop.load(Ordering::Relaxed) {
303                    broker_stop.store(false, Ordering::Relaxed);
304                    warn!("broker signaled topic close, triggering resubscription");
305                    match resubscribe(&consumer).await {
306                        Ok(_) => {
307                            info!("resubscription successful after broker close signal");
308                            continue;
309                        }
310                        Err(e) => {
311                            error!(error = ?e, "resubscription failed after broker close signal");
312                            return;
313                        }
314                    }
315                }
316            }
317
318            // Unrecoverable: attempt resubscription
319            Err(ref error) if matches!(error, DanubeError::Unrecoverable(_)) => {
320                if shutdown.load(Ordering::SeqCst) {
321                    return;
322                }
323                warn!(error = ?error, "unrecoverable error, attempting resubscription");
324                match resubscribe(&consumer).await {
325                    Ok(_) => {
326                        info!("resubscription successful after unrecoverable error");
327                        attempts = 0;
328                        continue;
329                    }
330                    Err(e) => {
331                        error!(error = ?e, "resubscription failed after unrecoverable error");
332                        return;
333                    }
334                }
335            }
336
337            // Retryable: backoff, then escalate to resubscription after max retries
338            Err(error) if retry_manager.is_retryable_error(&error) => {
339                if shutdown.load(Ordering::SeqCst) {
340                    return;
341                }
342                attempts += 1;
343                if attempts > retry_manager.max_retries() {
344                    warn!("max retries exceeded, attempting resubscription");
345                    match resubscribe(&consumer).await {
346                        Ok(_) => {
347                            info!("resubscription successful");
348                            attempts = 0;
349                            continue;
350                        }
351                        Err(e) => {
352                            error!(error = ?e, "resubscription failed");
353                            return;
354                        }
355                    }
356                }
357                let backoff = retry_manager.calculate_backoff(attempts - 1);
358                tokio::time::sleep(backoff).await;
359            }
360
361            // Non-retryable: bail
362            Err(error) => {
363                error!(error = ?error, "non-retryable error in consumer receive");
364                return;
365            }
366        }
367    }
368}
369
370/// Resubscribe a topic consumer (e.g., after an unrecoverable error or max retries exceeded).
371async fn resubscribe(consumer: &Arc<Mutex<TopicConsumer>>) -> Result<()> {
372    let mut locked = consumer.lock().await;
373    locked.subscribe().await?;
374    Ok(())
375}
376
377/// ConsumerBuilder is a builder for creating a new Consumer instance.
378///
379/// It allows setting various properties for the consumer such as topic, name, subscription,
380/// subscription type, and options.
381#[derive(Debug, Clone)]
382pub struct ConsumerBuilder {
383    client: DanubeClient,
384    topic: Option<String>,
385    consumer_name: Option<String>,
386    subscription: Option<String>,
387    subscription_type: Option<SubType>,
388    consumer_options: ConsumerOptions,
389    key_filters: Vec<String>,
390}
391
392impl ConsumerBuilder {
393    pub fn new(client: &DanubeClient) -> Self {
394        ConsumerBuilder {
395            client: client.clone(),
396            topic: None,
397            consumer_name: None,
398            subscription: None,
399            subscription_type: None,
400            consumer_options: ConsumerOptions::default(),
401            key_filters: Vec::new(),
402        }
403    }
404
405    /// Sets the topic name for the consumer.
406    ///
407    /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
408    ///
409    /// # Parameters
410    ///
411    /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
412    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
413        self.topic = Some(topic.into());
414        self
415    }
416
417    /// Sets the name of the consumer instance.
418    ///
419    /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
420    ///
421    /// # Parameters
422    ///
423    /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
424    pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
425        self.consumer_name = Some(consumer_name.into());
426        self
427    }
428
429    /// Sets the name of the subscription for the consumer.
430    ///
431    /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
432    ///
433    /// # Parameters
434    ///
435    /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
436    pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
437        self.subscription = Some(subscription_name.into());
438        self
439    }
440
441    /// Sets the type of subscription for the consumer. This field is optional.
442    ///
443    /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
444    ///
445    /// # Parameters
446    ///
447    /// - `sub_type`: The type of subscription. This should be one of the following:
448    ///   - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
449    ///   - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
450    ///   - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
451    pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
452        self.subscription_type = Some(subscription_type);
453        self
454    }
455
456    /// Sets the configuration options for the consumer, allowing customization of retry behavior.
457    pub fn with_options(mut self, options: ConsumerOptions) -> Self {
458        self.consumer_options = options;
459        self
460    }
461
462    /// Add a key filter pattern for KeyShared subscriptions.
463    /// Uses glob syntax: "user-*", "eu-west-?", "*"
464    pub fn with_key_filter(mut self, pattern: impl Into<String>) -> Self {
465        self.key_filters.push(pattern.into());
466        self
467    }
468
469    /// Add multiple key filter patterns for KeyShared subscriptions.
470    pub fn with_key_filters(mut self, patterns: Vec<String>) -> Self {
471        self.key_filters.extend(patterns);
472        self
473    }
474
475    /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
476    ///
477    /// This method performs validation to ensure that all required fields are set before creating the `Consumer`.  Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
478    ///
479    /// # Returns
480    ///
481    /// -  A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
482    pub fn build(self) -> Result<Consumer> {
483        let topic = self.topic.ok_or_else(|| {
484            DanubeError::Unrecoverable("topic is required to build a Consumer".into())
485        })?;
486        let consumer_name = self.consumer_name.ok_or_else(|| {
487            DanubeError::Unrecoverable("consumer name is required to build a Consumer".into())
488        })?;
489        let subscription = self.subscription.ok_or_else(|| {
490            DanubeError::Unrecoverable("subscription is required to build a Consumer".into())
491        })?;
492        Ok(Consumer::new(
493            self.client,
494            topic,
495            consumer_name,
496            subscription,
497            self.subscription_type,
498            self.consumer_options,
499            self.key_filters,
500        ))
501    }
502}
503
504/// Configuration options for consumers
505#[derive(Debug, Clone, Default)]
506#[non_exhaustive]
507pub struct ConsumerOptions {
508    // Maximum number of retry attempts
509    pub max_retries: usize,
510    // Base backoff in milliseconds for exponential backoff
511    pub base_backoff_ms: u64,
512    // Maximum backoff cap in milliseconds
513    pub max_backoff_ms: u64,
514}
515
516impl ConsumerOptions {
517    /// Create new ConsumerOptions with explicit retry settings.
518    pub fn new(max_retries: usize, base_backoff_ms: u64, max_backoff_ms: u64) -> Self {
519        Self {
520            max_retries,
521            base_backoff_ms,
522            max_backoff_ms,
523        }
524    }
525}