danube_client/consumer.rs
1use crate::{
2 errors::{DanubeError, Result},
3 retry_manager::RetryManager,
4 topic_consumer::TopicConsumer,
5 DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14};
15use tokio::sync::{mpsc, Mutex};
16use tokio::task::JoinHandle;
17use tracing::{error, info, warn};
18
19/// Buffer size for the message channel between per-partition tasks and the consumer.
20const RECEIVE_CHANNEL_BUFFER: usize = 100;
21/// Small delay after signaling shutdown to allow the broker to observe closure.
22const GRACEFUL_CLOSE_DELAY_MS: u64 = 100;
23
24/// Represents the type of subscription
25///
26/// Variants:
27/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
28/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
29/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
30/// multiple can subscribe but waits in standby, if the active consumer disconnect
31#[derive(Debug, Clone)]
32pub enum SubType {
33 Exclusive,
34 Shared,
35 FailOver,
36}
37
38/// Consumer represents a message consumer that subscribes to a topic and receives messages.
39/// It handles communication with the message broker and manages the consumer's state.
40#[derive(Debug)]
41pub struct Consumer {
42 // the Danube client
43 client: DanubeClient,
44 // the topic name, from where the messages are consumed
45 topic_name: String,
46 // the name of the Consumer
47 consumer_name: String,
48 // the map between the partitioned topic name and the consumer instance
49 consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
50 // the name of the subscription the consumer is attached to
51 subscription: String,
52 // the type of the subscription, that can be Shared and Exclusive
53 subscription_type: SubType,
54 // other configurable options for the consumer
55 consumer_options: ConsumerOptions,
56 // shutdown flag and task handles for graceful close
57 shutdown: Arc<AtomicBool>,
58 task_handles: Vec<JoinHandle<()>>,
59}
60
61impl Consumer {
62 pub(crate) fn new(
63 client: DanubeClient,
64 topic_name: String,
65 consumer_name: String,
66 subscription: String,
67 sub_type: Option<SubType>,
68 consumer_options: ConsumerOptions,
69 ) -> Self {
70 let subscription_type = sub_type.unwrap_or(SubType::Shared);
71
72 Consumer {
73 client,
74 topic_name,
75 consumer_name,
76 consumers: HashMap::new(),
77 subscription,
78 subscription_type,
79 consumer_options,
80 shutdown: Arc::new(AtomicBool::new(false)),
81 task_handles: Vec::new(),
82 }
83 }
84
85 /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
86 ///
87 /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
88 ///
89 /// # Errors
90 /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
91 pub async fn subscribe(&mut self) -> Result<()> {
92 // Get partitions from the topic
93 let partitions = self
94 .client
95 .lookup_service
96 .topic_partitions(&self.client.uri, &self.topic_name)
97 .await?;
98
99 // Create TopicConsumer for each partition
100 let mut tasks = Vec::new();
101 for topic_partition in partitions {
102 let topic_name = topic_partition.clone();
103 let consumer_name = self.consumer_name.clone();
104 let subscription = self.subscription.clone();
105 let subscription_type = self.subscription_type.clone();
106 let consumer_options = self.consumer_options.clone();
107 let client = self.client.clone();
108
109 let task = tokio::spawn(async move {
110 let mut topic_consumer = TopicConsumer::new(
111 client,
112 topic_name,
113 consumer_name,
114 subscription,
115 Some(subscription_type),
116 consumer_options,
117 );
118 match topic_consumer.subscribe().await {
119 Ok(_) => Ok(topic_consumer),
120 Err(e) => Err(e),
121 }
122 });
123
124 tasks.push(task);
125 }
126
127 // Wait for all tasks to complete
128 let results = join_all(tasks).await;
129
130 // Collect results
131 let mut topic_consumers = HashMap::new();
132 for result in results {
133 match result {
134 Ok(Ok(consumer)) => {
135 topic_consumers.insert(
136 consumer.get_topic_name().to_string(),
137 Arc::new(Mutex::new(consumer)),
138 );
139 }
140 Ok(Err(e)) => return Err(e),
141 Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
142 }
143 }
144
145 if topic_consumers.is_empty() {
146 return Err(DanubeError::Unrecoverable(
147 "No partitions found".to_string(),
148 ));
149 }
150
151 self.consumers.extend(topic_consumers.into_iter());
152 Ok(())
153 }
154
155 /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
156 ///
157 /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
158 ///
159 /// # Returns
160 ///
161 /// A `Result` with:
162 /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
163 /// - `Err(e)` if the receive client cannot be created or if other issues occur.
164 pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
165 let (tx, rx) = mpsc::channel(RECEIVE_CHANNEL_BUFFER);
166
167 let retry_manager = RetryManager::new(
168 self.consumer_options.max_retries,
169 self.consumer_options.base_backoff_ms,
170 self.consumer_options.max_backoff_ms,
171 );
172
173 for (_, consumer) in &self.consumers {
174 let broker_stop = {
175 let locked = consumer.lock().await;
176 Arc::clone(&locked.stop_signal)
177 };
178 let handle = tokio::spawn(partition_receive_loop(
179 Arc::clone(consumer),
180 tx.clone(),
181 retry_manager.clone(),
182 self.shutdown.clone(),
183 broker_stop,
184 ));
185 self.task_handles.push(handle);
186 }
187
188 Ok(rx)
189 }
190
191 pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
192 let topic_name = message.msg_id.topic_name.clone();
193 let topic_consumer = self.consumers.get_mut(&topic_name);
194 if let Some(topic_consumer) = topic_consumer {
195 let mut topic_consumer = topic_consumer.lock().await;
196 let _ = topic_consumer
197 .send_ack(
198 message.request_id,
199 message.msg_id.clone(),
200 &self.subscription,
201 )
202 .await?;
203 }
204 Ok(())
205 }
206
207 /// Gracefully close all receive tasks and stop background activities for this consumer
208 pub async fn close(&mut self) {
209 // signal shutdown
210 self.shutdown.store(true, Ordering::SeqCst);
211 // stop topic-level activities (e.g., health checks)
212 for (_, topic_consumer) in self.consumers.iter() {
213 let locked = topic_consumer.lock().await;
214 locked.stop();
215 }
216 // abort receive tasks
217 for handle in self.task_handles.drain(..) {
218 handle.abort();
219 }
220 // small delay to allow server to observe closure
221 tokio::time::sleep(std::time::Duration::from_millis(GRACEFUL_CLOSE_DELAY_MS)).await;
222 }
223}
224
225/// Per-partition receive loop: opens a message stream, forwards messages to `tx`,
226/// and handles retries / resubscription on errors.
227async fn partition_receive_loop(
228 consumer: Arc<Mutex<TopicConsumer>>,
229 tx: mpsc::Sender<StreamMessage>,
230 retry_manager: RetryManager,
231 shutdown: Arc<AtomicBool>,
232 broker_stop: Arc<AtomicBool>,
233) {
234 let mut attempts = 0;
235
236 loop {
237 if shutdown.load(Ordering::SeqCst) {
238 return;
239 }
240
241 let stream_result = {
242 let mut locked = consumer.lock().await;
243 locked.receive().await
244 };
245
246 match stream_result {
247 Ok(mut stream) => {
248 attempts = 0;
249
250 while !shutdown.load(Ordering::SeqCst) && !broker_stop.load(Ordering::Relaxed) {
251 match stream.next().await {
252 Some(Ok(stream_message)) => {
253 let message: StreamMessage = stream_message.into();
254 if tx.send(message).await.is_err() {
255 return; // Channel closed
256 }
257 }
258 Some(Err(e)) => {
259 warn!(error = %e, "error receiving message");
260 break; // Stream error, will retry
261 }
262 None => break, // Stream ended, will retry
263 }
264 }
265
266 if shutdown.load(Ordering::SeqCst) {
267 return;
268 }
269
270 // Broker signaled Close via health check — resubscribe immediately
271 if broker_stop.load(Ordering::Relaxed) {
272 broker_stop.store(false, Ordering::Relaxed);
273 warn!("broker signaled topic close, triggering resubscription");
274 match resubscribe(&consumer).await {
275 Ok(_) => {
276 info!("resubscription successful after broker close signal");
277 continue;
278 }
279 Err(e) => {
280 error!(error = ?e, "resubscription failed after broker close signal");
281 return;
282 }
283 }
284 }
285 }
286
287 // Unrecoverable: attempt resubscription
288 Err(ref error) if matches!(error, DanubeError::Unrecoverable(_)) => {
289 if shutdown.load(Ordering::SeqCst) {
290 return;
291 }
292 warn!(error = ?error, "unrecoverable error, attempting resubscription");
293 match resubscribe(&consumer).await {
294 Ok(_) => {
295 info!("resubscription successful after unrecoverable error");
296 attempts = 0;
297 continue;
298 }
299 Err(e) => {
300 error!(error = ?e, "resubscription failed after unrecoverable error");
301 return;
302 }
303 }
304 }
305
306 // Retryable: backoff, then escalate to resubscription after max retries
307 Err(error) if retry_manager.is_retryable_error(&error) => {
308 if shutdown.load(Ordering::SeqCst) {
309 return;
310 }
311 attempts += 1;
312 if attempts > retry_manager.max_retries() {
313 warn!("max retries exceeded, attempting resubscription");
314 match resubscribe(&consumer).await {
315 Ok(_) => {
316 info!("resubscription successful");
317 attempts = 0;
318 continue;
319 }
320 Err(e) => {
321 error!(error = ?e, "resubscription failed");
322 return;
323 }
324 }
325 }
326 let backoff = retry_manager.calculate_backoff(attempts - 1);
327 tokio::time::sleep(backoff).await;
328 }
329
330 // Non-retryable: bail
331 Err(error) => {
332 error!(error = ?error, "non-retryable error in consumer receive");
333 return;
334 }
335 }
336 }
337}
338
339/// Resubscribe a topic consumer (e.g., after an unrecoverable error or max retries exceeded).
340async fn resubscribe(consumer: &Arc<Mutex<TopicConsumer>>) -> Result<()> {
341 let mut locked = consumer.lock().await;
342 locked.subscribe().await?;
343 Ok(())
344}
345
346/// ConsumerBuilder is a builder for creating a new Consumer instance.
347///
348/// It allows setting various properties for the consumer such as topic, name, subscription,
349/// subscription type, and options.
350#[derive(Debug, Clone)]
351pub struct ConsumerBuilder {
352 client: DanubeClient,
353 topic: Option<String>,
354 consumer_name: Option<String>,
355 subscription: Option<String>,
356 subscription_type: Option<SubType>,
357 consumer_options: ConsumerOptions,
358}
359
360impl ConsumerBuilder {
361 pub fn new(client: &DanubeClient) -> Self {
362 ConsumerBuilder {
363 client: client.clone(),
364 topic: None,
365 consumer_name: None,
366 subscription: None,
367 subscription_type: None,
368 consumer_options: ConsumerOptions::default(),
369 }
370 }
371
372 /// Sets the topic name for the consumer.
373 ///
374 /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
375 ///
376 /// # Parameters
377 ///
378 /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
379 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
380 self.topic = Some(topic.into());
381 self
382 }
383
384 /// Sets the name of the consumer instance.
385 ///
386 /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
387 ///
388 /// # Parameters
389 ///
390 /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
391 pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
392 self.consumer_name = Some(consumer_name.into());
393 self
394 }
395
396 /// Sets the name of the subscription for the consumer.
397 ///
398 /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
399 ///
400 /// # Parameters
401 ///
402 /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
403 pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
404 self.subscription = Some(subscription_name.into());
405 self
406 }
407
408 /// Sets the type of subscription for the consumer. This field is optional.
409 ///
410 /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
411 ///
412 /// # Parameters
413 ///
414 /// - `sub_type`: The type of subscription. This should be one of the following:
415 /// - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
416 /// - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
417 /// - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
418 pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
419 self.subscription_type = Some(subscription_type);
420 self
421 }
422
423 /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
424 ///
425 /// This method performs validation to ensure that all required fields are set before creating the `Consumer`. Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
426 ///
427 /// # Returns
428 ///
429 /// - A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
430 pub fn build(self) -> Result<Consumer> {
431 let topic = self.topic.ok_or_else(|| {
432 DanubeError::Unrecoverable("topic is required to build a Consumer".into())
433 })?;
434 let consumer_name = self.consumer_name.ok_or_else(|| {
435 DanubeError::Unrecoverable("consumer name is required to build a Consumer".into())
436 })?;
437 let subscription = self.subscription.ok_or_else(|| {
438 DanubeError::Unrecoverable("subscription is required to build a Consumer".into())
439 })?;
440 Ok(Consumer::new(
441 self.client,
442 topic,
443 consumer_name,
444 subscription,
445 self.subscription_type,
446 self.consumer_options,
447 ))
448 }
449}
450
451/// Configuration options for consumers
452#[derive(Debug, Clone, Default)]
453#[non_exhaustive]
454pub struct ConsumerOptions {
455 // Maximum number of retry attempts
456 pub max_retries: usize,
457 // Base backoff in milliseconds for exponential backoff
458 pub base_backoff_ms: u64,
459 // Maximum backoff cap in milliseconds
460 pub max_backoff_ms: u64,
461}