1use crate::{
2 errors::{DanubeError, Result},
3 retry_manager::RetryManager,
4 topic_consumer::TopicConsumer,
5 DanubeClient,
6};
7
8use danube_core::message::StreamMessage;
9use futures::{future::join_all, StreamExt};
10use std::collections::HashMap;
11use std::sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14};
15use tokio::sync::{mpsc, Mutex};
16use tokio::task::JoinHandle;
17use tracing::{error, info, warn};
18
19const RECEIVE_CHANNEL_BUFFER: usize = 100;
21const GRACEFUL_CLOSE_DELAY_MS: u64 = 100;
23
24#[derive(Debug, Clone)]
32pub enum SubType {
33 Exclusive,
34 Shared,
35 FailOver,
36 KeyShared,
37}
38
39#[derive(Debug)]
42pub struct Consumer {
43 client: DanubeClient,
45 topic_name: String,
47 consumer_name: String,
49 consumers: HashMap<String, Arc<Mutex<TopicConsumer>>>,
51 subscription: String,
53 subscription_type: SubType,
55 consumer_options: ConsumerOptions,
57 key_filters: Vec<String>,
59 shutdown: Arc<AtomicBool>,
61 task_handles: Vec<JoinHandle<()>>,
62}
63
64impl Consumer {
65 pub(crate) fn new(
66 client: DanubeClient,
67 topic_name: String,
68 consumer_name: String,
69 subscription: String,
70 sub_type: Option<SubType>,
71 consumer_options: ConsumerOptions,
72 key_filters: Vec<String>,
73 ) -> Self {
74 let subscription_type = sub_type.unwrap_or(SubType::Shared);
75
76 Consumer {
77 client,
78 topic_name,
79 consumer_name,
80 consumers: HashMap::new(),
81 subscription,
82 subscription_type,
83 consumer_options,
84 key_filters,
85 shutdown: Arc::new(AtomicBool::new(false)),
86 task_handles: Vec::new(),
87 }
88 }
89
90 pub async fn subscribe(&mut self) -> Result<()> {
97 let partitions = self
99 .client
100 .lookup_service
101 .topic_partitions(&self.client.uri, &self.topic_name)
102 .await?;
103
104 let mut tasks = Vec::new();
106 for topic_partition in partitions {
107 let topic_name = topic_partition.clone();
108 let consumer_name = self.consumer_name.clone();
109 let subscription = self.subscription.clone();
110 let subscription_type = self.subscription_type.clone();
111 let consumer_options = self.consumer_options.clone();
112 let key_filters = self.key_filters.clone();
113 let client = self.client.clone();
114
115 let task = tokio::spawn(async move {
116 let mut topic_consumer = TopicConsumer::new(
117 client,
118 topic_name,
119 consumer_name,
120 subscription,
121 Some(subscription_type),
122 key_filters,
123 consumer_options,
124 );
125 match topic_consumer.subscribe().await {
126 Ok(_) => Ok(topic_consumer),
127 Err(e) => Err(e),
128 }
129 });
130
131 tasks.push(task);
132 }
133
134 let results = join_all(tasks).await;
136
137 let mut topic_consumers = HashMap::new();
139 for result in results {
140 match result {
141 Ok(Ok(consumer)) => {
142 topic_consumers.insert(
143 consumer.get_topic_name().to_string(),
144 Arc::new(Mutex::new(consumer)),
145 );
146 }
147 Ok(Err(e)) => return Err(e),
148 Err(e) => return Err(DanubeError::Unrecoverable(e.to_string())),
149 }
150 }
151
152 if topic_consumers.is_empty() {
153 return Err(DanubeError::Unrecoverable(format!(
154 "No topics found for '{}'",
155 self.topic_name
156 )));
157 }
158
159 self.consumers.extend(topic_consumers.into_iter());
160 Ok(())
161 }
162
163 pub async fn receive(&mut self) -> Result<mpsc::Receiver<StreamMessage>> {
173 let (tx, rx) = mpsc::channel(RECEIVE_CHANNEL_BUFFER);
174
175 let retry_manager = RetryManager::new(
176 self.consumer_options.max_retries,
177 self.consumer_options.base_backoff_ms,
178 self.consumer_options.max_backoff_ms,
179 );
180
181 for (_, consumer) in &self.consumers {
182 let broker_stop = {
183 let locked = consumer.lock().await;
184 Arc::clone(&locked.stop_signal)
185 };
186 let handle = tokio::spawn(partition_receive_loop(
187 Arc::clone(consumer),
188 tx.clone(),
189 retry_manager.clone(),
190 self.shutdown.clone(),
191 broker_stop,
192 ));
193 self.task_handles.push(handle);
194 }
195
196 Ok(rx)
197 }
198
199 pub async fn ack(&mut self, message: &StreamMessage) -> Result<()> {
200 let topic_name = message.msg_id.topic_name.clone();
201 let topic_consumer = self.consumers.get_mut(&topic_name);
202 if let Some(topic_consumer) = topic_consumer {
203 let mut topic_consumer = topic_consumer.lock().await;
204 let _ = topic_consumer
205 .send_ack(
206 message.request_id,
207 message.msg_id.clone(),
208 &self.subscription,
209 )
210 .await?;
211 }
212 Ok(())
213 }
214
215 pub async fn nack(
216 &mut self,
217 message: &StreamMessage,
218 delay_ms: Option<u64>,
219 reason: Option<String>,
220 ) -> Result<()> {
221 let topic_name = message.msg_id.topic_name.clone();
222 let topic_consumer = self.consumers.get_mut(&topic_name);
223 if let Some(topic_consumer) = topic_consumer {
224 let mut topic_consumer = topic_consumer.lock().await;
225 let _ = topic_consumer
226 .send_nack(
227 message.request_id,
228 message.msg_id.clone(),
229 &self.subscription,
230 delay_ms,
231 reason,
232 )
233 .await?;
234 }
235 Ok(())
236 }
237
238 pub async fn close(&mut self) {
240 self.shutdown.store(true, Ordering::SeqCst);
242 for (_, topic_consumer) in self.consumers.iter() {
244 let locked = topic_consumer.lock().await;
245 locked.stop();
246 }
247 for handle in self.task_handles.drain(..) {
249 handle.abort();
250 }
251 tokio::time::sleep(std::time::Duration::from_millis(GRACEFUL_CLOSE_DELAY_MS)).await;
253 }
254}
255
256async fn partition_receive_loop(
259 consumer: Arc<Mutex<TopicConsumer>>,
260 tx: mpsc::Sender<StreamMessage>,
261 retry_manager: RetryManager,
262 shutdown: Arc<AtomicBool>,
263 broker_stop: Arc<AtomicBool>,
264) {
265 let mut attempts = 0;
266
267 loop {
268 if shutdown.load(Ordering::SeqCst) {
269 return;
270 }
271
272 let stream_result = {
273 let mut locked = consumer.lock().await;
274 locked.receive().await
275 };
276
277 match stream_result {
278 Ok(mut stream) => {
279 attempts = 0;
280
281 while !shutdown.load(Ordering::SeqCst) && !broker_stop.load(Ordering::Relaxed) {
282 match stream.next().await {
283 Some(Ok(stream_message)) => {
284 let message: StreamMessage = stream_message.into();
285 if tx.send(message).await.is_err() {
286 return; }
288 }
289 Some(Err(e)) => {
290 warn!(error = %e, "error receiving message");
291 break; }
293 None => break, }
295 }
296
297 if shutdown.load(Ordering::SeqCst) {
298 return;
299 }
300
301 if broker_stop.load(Ordering::Relaxed) {
303 broker_stop.store(false, Ordering::Relaxed);
304 warn!("broker signaled topic close, triggering resubscription");
305 match resubscribe(&consumer).await {
306 Ok(_) => {
307 info!("resubscription successful after broker close signal");
308 continue;
309 }
310 Err(e) => {
311 error!(error = ?e, "resubscription failed after broker close signal");
312 return;
313 }
314 }
315 }
316 }
317
318 Err(ref error) if matches!(error, DanubeError::Unrecoverable(_)) => {
320 if shutdown.load(Ordering::SeqCst) {
321 return;
322 }
323 warn!(error = ?error, "unrecoverable error, attempting resubscription");
324 match resubscribe(&consumer).await {
325 Ok(_) => {
326 info!("resubscription successful after unrecoverable error");
327 attempts = 0;
328 continue;
329 }
330 Err(e) => {
331 error!(error = ?e, "resubscription failed after unrecoverable error");
332 return;
333 }
334 }
335 }
336
337 Err(error) if retry_manager.is_retryable_error(&error) => {
339 if shutdown.load(Ordering::SeqCst) {
340 return;
341 }
342 attempts += 1;
343 if attempts > retry_manager.max_retries() {
344 warn!("max retries exceeded, attempting resubscription");
345 match resubscribe(&consumer).await {
346 Ok(_) => {
347 info!("resubscription successful");
348 attempts = 0;
349 continue;
350 }
351 Err(e) => {
352 error!(error = ?e, "resubscription failed");
353 return;
354 }
355 }
356 }
357 let backoff = retry_manager.calculate_backoff(attempts - 1);
358 tokio::time::sleep(backoff).await;
359 }
360
361 Err(error) => {
363 error!(error = ?error, "non-retryable error in consumer receive");
364 return;
365 }
366 }
367 }
368}
369
370async fn resubscribe(consumer: &Arc<Mutex<TopicConsumer>>) -> Result<()> {
372 let mut locked = consumer.lock().await;
373 locked.subscribe().await?;
374 Ok(())
375}
376
377#[derive(Debug, Clone)]
382pub struct ConsumerBuilder {
383 client: DanubeClient,
384 topic: Option<String>,
385 consumer_name: Option<String>,
386 subscription: Option<String>,
387 subscription_type: Option<SubType>,
388 consumer_options: ConsumerOptions,
389 key_filters: Vec<String>,
390}
391
392impl ConsumerBuilder {
393 pub fn new(client: &DanubeClient) -> Self {
394 ConsumerBuilder {
395 client: client.clone(),
396 topic: None,
397 consumer_name: None,
398 subscription: None,
399 subscription_type: None,
400 consumer_options: ConsumerOptions::default(),
401 key_filters: Vec::new(),
402 }
403 }
404
405 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
413 self.topic = Some(topic.into());
414 self
415 }
416
417 pub fn with_consumer_name(mut self, consumer_name: impl Into<String>) -> Self {
425 self.consumer_name = Some(consumer_name.into());
426 self
427 }
428
429 pub fn with_subscription(mut self, subscription_name: impl Into<String>) -> Self {
437 self.subscription = Some(subscription_name.into());
438 self
439 }
440
441 pub fn with_subscription_type(mut self, subscription_type: SubType) -> Self {
452 self.subscription_type = Some(subscription_type);
453 self
454 }
455
456 pub fn with_options(mut self, options: ConsumerOptions) -> Self {
458 self.consumer_options = options;
459 self
460 }
461
462 pub fn with_key_filter(mut self, pattern: impl Into<String>) -> Self {
465 self.key_filters.push(pattern.into());
466 self
467 }
468
469 pub fn with_key_filters(mut self, patterns: Vec<String>) -> Self {
471 self.key_filters.extend(patterns);
472 self
473 }
474
475 pub fn build(self) -> Result<Consumer> {
483 let topic = self.topic.ok_or_else(|| {
484 DanubeError::Unrecoverable("topic is required to build a Consumer".into())
485 })?;
486 let consumer_name = self.consumer_name.ok_or_else(|| {
487 DanubeError::Unrecoverable("consumer name is required to build a Consumer".into())
488 })?;
489 let subscription = self.subscription.ok_or_else(|| {
490 DanubeError::Unrecoverable("subscription is required to build a Consumer".into())
491 })?;
492 Ok(Consumer::new(
493 self.client,
494 topic,
495 consumer_name,
496 subscription,
497 self.subscription_type,
498 self.consumer_options,
499 self.key_filters,
500 ))
501 }
502}
503
504#[derive(Debug, Clone, Default)]
506#[non_exhaustive]
507pub struct ConsumerOptions {
508 pub max_retries: usize,
510 pub base_backoff_ms: u64,
512 pub max_backoff_ms: u64,
514}
515
516impl ConsumerOptions {
517 pub fn new(max_retries: usize, base_backoff_ms: u64, max_backoff_ms: u64) -> Self {
519 Self {
520 max_retries,
521 base_backoff_ms,
522 max_backoff_ms,
523 }
524 }
525}