danube_client/consumer.rs
1use crate::{
2 errors::{DanubeError, Result},
3 retry_manager::RetryManager,
4 topic_consumer::TopicConsumer,
5 DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
12use tokio::sync::{mpsc, Mutex};
13use tokio::task::JoinHandle;
14
15/// Represents the type of subscription
16///
17/// Variants:
18/// - `Exclusive`: Only one consumer can subscribe to the topic at a time.
19/// - `Shared`: Multiple consumers can subscribe to the topic concurrently.
20/// - `FailOver`: Only one consumer can subscribe to the topic at a time,
21/// multiple can subscribe but waits in standby, if the active consumer disconnect
22#[derive(Debug, Clone)]
23pub enum SubType {
24 Exclusive,
25 Shared,
26 FailOver,
27}
28
29/// Consumer represents a message consumer that subscribes to a topic and receives messages.
30/// It handles communication with the message broker and manages the consumer's state.
31#[derive(Debug)]
32pub struct Consumer {
33 // the Danube client
34 client: DanubeClient,
35 // the topic name, from where the messages are consumed
36 topic_name: String,
37 // the name of the Consumer
38 consumer_name: String,
39 // the map between the partitioned topic name and the consumer instance
40 consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
41 // the name of the subscription the consumer is attached to
42 subscription: String,
43 // the type of the subscription, that can be Shared and Exclusive
44 subscription_type: SubType,
45 // other configurable options for the consumer
46 consumer_options: ConsumerOptions,
47 // shutdown flag and task handles for graceful close
48 shutdown: Arc<AtomicBool>,
49 task_handles: Vec<JoinHandle<()>>,
50}
51
52impl Consumer {
53 pub(crate) fn new(
54 client: DanubeClient,
55 topic_name: String,
56 consumer_name: String,
57 subscription: String,
58 sub_type: Option<SubType>,
59 consumer_options: ConsumerOptions,
60 ) -> Self {
61 let subscription_type = if let Some(sub_type) = sub_type {
62 sub_type
63 } else {
64 SubType::Shared
65 };
66
67 Consumer {
68 client,
69 topic_name,
70 consumer_name,
71 consumers: HashMap::new(),
72 subscription,
73 subscription_type,
74 consumer_options,
75 shutdown: Arc::new(AtomicBool::new(false)),
76 task_handles: Vec::new(),
77 }
78 }
79
80 /// Initializes the subscription to a non-partitioned or partitioned topic and starts the health check service.
81 ///
82 /// This function establishes a gRPC connection with the brokers and requests to subscribe to the specified topic.
83 ///
84 /// # Errors
85 /// If an error occurs during subscription or initialization, it is returned as part of the `Err` variant.
86 pub async fn subscribe(&mut self) -> Result<()> {
87 // Get partitions from the topic
88 let partitions = self
89 .client
90 .lookup_service
91 .topic_partitions(&self.client.uri, &self.topic_name)
92 .await?;
93
94 // Create TopicConsumer for each partition
95 let mut tasks = Vec::new();
96 for topic_partition in partitions {
97 let topic_name = topic_partition.clone();
98 let consumer_name = self.consumer_name.clone();
99 let subscription = self.subscription.clone();
100 let subscription_type = self.subscription_type.clone();
101 let consumer_options = self.consumer_options.clone();
102 let client = self.client.clone();
103
104 let task = tokio::spawn(async move {
105 let mut topic_consumer = TopicConsumer::new(
106 client,
107 topic_name,
108 consumer_name,
109 subscription,
110 Some(subscription_type),
111 consumer_options,
112 );
113 match topic_consumer.subscribe().await {
114 Ok(_) => Ok(topic_consumer),
115 Err(e) => Err(e),
116 }
117 });
118
119 tasks.push(task);
120 }
121
122 // Wait for all tasks to complete
123 let results = join_all(tasks).await;
124
125 // Collect results
126 let mut topic_consumers = HashMap::new();
127 for result in results {
128 match result {
129 Ok(Ok(consumer)) => {
130 topic_consumers.insert(
131 consumer.get_topic_name().to_string(),
132 Arc::new(Mutex::new(consumer)),
133 );
134 }
135 Ok(Err(e)) => return Err(e),
136 Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
137 }
138 }
139
140 if topic_consumers.is_empty() {
141 return Err(DanubeError::Unrecoverable(
142 "No partitions found".to_string(),
143 ));
144 }
145
146 self.consumers.extend(topic_consumers.into_iter());
147 Ok(())
148 }
149
150 /// Starts receiving messages from the subscribed partitioned or non-partitioned topic.
151 ///
152 /// This function continuously polls for new messages and handles them as long as the `stop_signal` has not been set to `true`.
153 ///
154 /// # Returns
155 ///
156 /// A `Result` with:
157 /// - `Ok(mpsc::Receiver<StreamMessage>)` if the receive client is successfully created and ready to receive messages.
158 /// - `Err(e)` if the receive client cannot be created or if other issues occur.
159 pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
160 // Create a channel to send messages to the client
161 let (tx, rx) = mpsc::channel(100); // Buffer size of 100, adjust as needed
162
163 // Create retry manager for consumers
164 let retry_manager = RetryManager::new(
165 self.consumer_options.max_retries,
166 self.consumer_options.base_backoff_ms,
167 self.consumer_options.max_backoff_ms,
168 );
169
170 // Spawn a task for each cloned TopicConsumer
171 for (_, consumer) in &self.consumers {
172 let tx = tx.clone();
173 let consumer = Arc::clone(consumer);
174 let retry_manager = retry_manager.clone();
175 let shutdown = self.shutdown.clone();
176
177 let handle: JoinHandle<()> = tokio::spawn(async move {
178 let mut attempts = 0;
179 let max_retries = if retry_manager.max_retries() == 0 {
180 5
181 } else {
182 retry_manager.max_retries()
183 };
184
185 loop {
186 if shutdown.load(Ordering::SeqCst) { return; }
187 // Try to get stream from consumer (subscribe is handled internally with its own retry)
188 let stream_result = {
189 let mut locked = consumer.lock().await;
190 locked.receive().await
191 };
192
193 match stream_result {
194 Ok(mut stream) => {
195 attempts = 0; // Reset attempts on successful connection
196
197 // Process messages until stream ends or errors
198 while !shutdown.load(Ordering::SeqCst) {
199 let message_opt = stream.next().await;
200 if message_opt.is_none() { break; }
201 let message = message_opt.unwrap();
202 match message {
203 Ok(stream_message) => {
204 let message: StreamMessage = stream_message.into();
205 if tx.send(message).await.is_err() {
206 // Channel is closed, exit the task
207 return;
208 }
209 }
210 Err(e) => {
211 eprintln!("Error receiving message: {}", e);
212 break; // Stream error, will retry receive
213 }
214 }
215 }
216 // Stream ended, retry receive
217 }
218 Err(error) => {
219 if shutdown.load(Ordering::SeqCst) { return; }
220 // Check if this is an unrecoverable error (e.g., stream client not initialized)
221 if matches!(error, DanubeError::Unrecoverable(_)) {
222 eprintln!("Unrecoverable error detected, attempting resubscription: {:?}", error);
223
224 // Attempt to resubscribe for unrecoverable errors
225 let resubscribe_result = {
226 let mut locked = consumer.lock().await;
227 locked.subscribe().await
228 };
229
230 match resubscribe_result {
231 Ok(_) => {
232 eprintln!("Resubscription successful after unrecoverable error, continuing...");
233 attempts = 0; // Reset attempts after successful resubscription
234 continue; // Go back to creating stream_result
235 }
236 Err(e) => {
237 eprintln!("Resubscription failed after unrecoverable error: {:?}", e);
238 return; // Exit task if resubscription fails
239 }
240 }
241 }
242
243 // Failed to get stream, check if retryable
244 if retry_manager.is_retryable_error(&error) {
245 attempts += 1;
246 if attempts > max_retries {
247 eprintln!("Max retries exceeded for consumer receive, attempting resubscription");
248
249 // Attempt to resubscribe
250 let resubscribe_result = {
251 let mut locked = consumer.lock().await;
252 locked.subscribe().await
253 };
254
255 match resubscribe_result {
256 Ok(_) => {
257 eprintln!("Resubscription successful, continuing...");
258 break; // Break out of retry loop and go back to creating stream_result
259 }
260 Err(e) => {
261 eprintln!("Resubscription failed: {:?}", e);
262 return; // Exit task if resubscription fails
263 }
264 }
265 }
266 let backoff = retry_manager.calculate_backoff(attempts - 1);
267 tokio::time::sleep(backoff).await;
268 } else {
269 eprintln!("Non-retryable error in consumer receive: {:?}", error);
270 return; // Non-retryable error
271 }
272 }
273 }
274 }
275 });
276 self.task_handles.push(handle);
277 }
278
279 Ok(rx)
280 }
281
282 pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
283 let topic_name = message.msg_id.topic_name.clone();
284 let topic_consumer = self.consumers.get_mut(&topic_name);
285 if let Some(topic_consumer) = topic_consumer {
286 let mut topic_consumer = topic_consumer.lock().await;
287 let _ = topic_consumer
288 .send_ack(
289 message.request_id,
290 message.msg_id.clone(),
291 &self.subscription,
292 )
293 .await?;
294 }
295 Ok(())
296 }
297
298 /// Gracefully close all receive tasks and stop background activities for this consumer
299 pub async fn close(&mut self) {
300 // signal shutdown
301 self.shutdown.store(true, Ordering::SeqCst);
302 // stop topic-level activities (e.g., health checks)
303 for (_, topic_consumer) in self.consumers.iter() {
304 let locked = topic_consumer.lock().await;
305 locked.stop();
306 }
307 // abort receive tasks
308 for handle in self.task_handles.drain(..) {
309 handle.abort();
310 }
311 // small delay to allow server to observe closure
312 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
313 }
314}
315
316/// ConsumerBuilder is a builder for creating a new Consumer instance.
317///
318/// It allows setting various properties for the consumer such as topic, name, subscription,
319/// subscription type, and options.
320#[derive(Debug, Clone)]
321pub struct ConsumerBuilder {
322 client: DanubeClient,
323 topic: Option<String>,
324 consumer_name: Option<String>,
325 subscription: Option<String>,
326 subscription_type: Option<SubType>,
327 consumer_options: ConsumerOptions,
328}
329
330impl ConsumerBuilder {
331 pub fn new(client: &DanubeClient) -> Self {
332 ConsumerBuilder {
333 client: client.clone(),
334 topic: None,
335 consumer_name: None,
336 subscription: None,
337 subscription_type: None,
338 consumer_options: ConsumerOptions::default(),
339 }
340 }
341
342 /// Sets the topic name for the consumer.
343 ///
344 /// This method specifies the topic that the consumer will subscribe to. It is a required field and must be set before the consumer can be created.
345 ///
346 /// # Parameters
347 ///
348 /// - `topic`: The name of the topic for the consumer. This should be a non-empty string that corresponds to an existing topic.
349 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
350 self.topic = Some(topic.into());
351 self
352 }
353
354 /// Sets the name of the consumer instance.
355 ///
356 /// This method specifies the name to be assigned to the consumer. It is a required field and must be set before the consumer can be created.
357 ///
358 /// # Parameters
359 ///
360 /// - `consumer_name`: The name for the consumer instance. This should be a non-empty string that uniquely identifies the consumer.
361 pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
362 self.consumer_name = Some(consumer_name.into());
363 self
364 }
365
366 /// Sets the name of the subscription for the consumer.
367 ///
368 /// This method specifies the subscription that the consumer will use. It is a required field and must be set before the consumer can be created.
369 ///
370 /// # Parameters
371 ///
372 /// - `subscription_name`: The name of the subscription. This should be a non-empty string that identifies the subscription to which the consumer will be subscribed.
373 pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
374 self.subscription = Some(subscription_name.into());
375 self
376 }
377
378 /// Sets the type of subscription for the consumer. This field is optional.
379 ///
380 /// This method specifies the type of subscription that the consumer will use. The subscription type determines how messages are distributed to consumers that share the same subscription.
381 ///
382 /// # Parameters
383 ///
384 /// - `sub_type`: The type of subscription. This should be one of the following:
385 /// - `SubType::Exclusive`: The consumer exclusively receives all messages for the subscription.
386 /// - `SubType::Shared`: Messages are distributed among multiple consumers sharing the same subscription. Default if not specified.
387 /// - `SubType::FailOver`: Only one consumer receives messages, and if it fails, another consumer takes over.
388 pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
389 self.subscription_type = Some(subscription_type);
390 self
391 }
392
393 /// Creates a new `Consumer` instance using the settings configured in the `ConsumerBuilder`.
394 ///
395 /// This method performs validation to ensure that all required fields are set before creating the `Consumer`. Once validation is successful, it constructs and returns a new `Consumer` instance configured with the specified settings.
396 ///
397 /// # Returns
398 ///
399 /// - A `Consumer` instance if the builder configuration is valid and the consumer is created successfully.
400 pub fn build(self) -> Consumer {
401 let topic = self.topic.expect("you should specify the topic");
402 let consumer_name = self
403 .consumer_name
404 .expect("you should provide a name for the consumer");
405 let subscription = self
406 .subscription
407 .expect("you should provide the name of the subscription");
408 Consumer::new(
409 self.client,
410 topic,
411 consumer_name,
412 subscription,
413 self.subscription_type,
414 self.consumer_options,
415 )
416 }
417}
418
419/// Configuration options for consumers
420#[derive(Debug, Clone, Default)]
421pub struct ConsumerOptions {
422 // Reserved for future use
423 pub others: String,
424 // Maximum number of retry attempts
425 pub max_retries: usize,
426 // Base backoff in milliseconds for exponential backoff
427 pub base_backoff_ms: u64,
428 // Maximum backoff cap in milliseconds
429 pub max_backoff_ms: u64,
430}