1mod protocol;
21mod runtime;
22mod scheduler;
23mod share;
24mod state;
25
26use std::collections::BTreeSet;
27use std::time::Duration;
28
29use tokio::sync::{mpsc, oneshot};
30use tokio::task::JoinHandle;
31use tracing::{debug, instrument};
32
33use crate::config::ConsumerConfig;
34use crate::types::{
35 CommitOffset, ConsumerGroupMetadata, ConsumerRecords, SubscriptionPattern, TopicPartition,
36 TopicPartitionInfo, TopicPartitionOffset, TopicPartitionOffsetAndTimestamp,
37 TopicPartitionTimestamp,
38};
39use crate::{ConsumerError, Result};
40
41use runtime::ConsumerRuntime;
42
43pub struct KafkaConsumer {
45 application_event_handler: ConsumerApplicationEventHandler,
46 join: JoinHandle<()>,
47 default_poll_timeout: Duration,
48}
49
50pub use share::{
51 AcknowledgeType, AcknowledgementCommitCallback, KafkaShareConsumer, ShareAcknowledgementCommit,
52 ShareAcquireMode, ShareConsumerOptions, ShareRecord, ShareRecords,
53};
54
55impl KafkaConsumer {
56 #[instrument(
57 name = "consumer.connect",
58 level = "debug",
59 skip(config),
60 fields(
61 bootstrap_server_count = config.bootstrap_servers.len(),
62 client_id = %config.client_id,
63 group_id = %config.group_id
64 )
65 )]
66 pub async fn connect(config: ConsumerConfig) -> Result<Self> {
68 let (tx, rx) = mpsc::channel(64);
69 let runtime = ConsumerRuntime::new(config);
70 let join = tokio::spawn(async move {
71 runtime.run(rx).await;
72 });
73
74 let consumer = Self {
75 application_event_handler: ConsumerApplicationEventHandler::new(tx),
76 join,
77 default_poll_timeout: Duration::from_millis(100),
78 };
79 if let Err(error) = consumer.warm_up().await {
80 consumer.join.abort();
81 return Err(error);
82 }
83 debug!("consumer connected");
84 Ok(consumer)
85 }
86
87 pub fn with_default_poll_timeout(mut self, timeout: Duration) -> Self {
89 self.default_poll_timeout = timeout;
90 self
91 }
92
93 #[instrument(
94 name = "consumer.subscribe",
95 level = "debug",
96 skip(self, topics),
97 fields(topic_count = topics.len())
98 )]
99 pub async fn subscribe(&self, topics: Vec<String>) -> Result<()> {
101 let (reply_tx, reply_rx) = oneshot::channel();
102 self.application_event_handler
103 .send(
104 ConsumerRuntimeEvent::Subscribe {
105 topics,
106 reply: reply_tx,
107 },
108 ConsumerError::ThreadStoppedBefore {
109 operation: "subscribe",
110 },
111 )
112 .await?;
113 reply_rx
114 .await
115 .map_err(|_| ConsumerError::ThreadStoppedDuring {
116 operation: "subscribe",
117 })?
118 }
119
120 #[instrument(
121 name = "consumer.subscribe_pattern",
122 level = "debug",
123 skip(self, pattern),
124 fields(pattern = %pattern.pattern())
125 )]
126 pub async fn subscribe_pattern(&self, pattern: SubscriptionPattern) -> Result<()> {
128 let (reply_tx, reply_rx) = oneshot::channel();
129 self.application_event_handler
130 .send(
131 ConsumerRuntimeEvent::SubscribePattern {
132 pattern,
133 reply: reply_tx,
134 },
135 ConsumerError::ThreadStoppedBefore {
136 operation: "subscribe_pattern",
137 },
138 )
139 .await?;
140 reply_rx
141 .await
142 .map_err(|_| ConsumerError::ThreadStoppedDuring {
143 operation: "subscribe_pattern",
144 })?
145 }
146
147 #[instrument(
148 name = "consumer.subscribe_regex",
149 level = "debug",
150 skip(self, pattern),
151 fields(pattern = %pattern)
152 )]
153 pub async fn subscribe_regex(&self, pattern: String) -> Result<()> {
155 let (reply_tx, reply_rx) = oneshot::channel();
156 self.application_event_handler
157 .send(
158 ConsumerRuntimeEvent::SubscribeRegex {
159 pattern,
160 reply: reply_tx,
161 },
162 ConsumerError::ThreadStoppedBefore {
163 operation: "subscribe_regex",
164 },
165 )
166 .await?;
167 reply_rx
168 .await
169 .map_err(|_| ConsumerError::ThreadStoppedDuring {
170 operation: "subscribe_regex",
171 })?
172 }
173
174 #[instrument(name = "consumer.unsubscribe", level = "debug", skip(self))]
175 pub async fn unsubscribe(&self) -> Result<()> {
177 let (reply_tx, reply_rx) = oneshot::channel();
178 self.application_event_handler
179 .send(
180 ConsumerRuntimeEvent::Unsubscribe { reply: reply_tx },
181 ConsumerError::ThreadStoppedBefore {
182 operation: "unsubscribe",
183 },
184 )
185 .await?;
186 reply_rx
187 .await
188 .map_err(|_| ConsumerError::ThreadStoppedDuring {
189 operation: "unsubscribe",
190 })?
191 }
192
193 pub async fn poll(&self) -> Result<ConsumerRecords> {
195 self.poll_for(self.default_poll_timeout).await
196 }
197
198 #[instrument(
199 name = "consumer.poll",
200 level = "debug",
201 skip(self),
202 fields(timeout_ms = timeout.as_millis())
203 )]
204 pub async fn poll_for(&self, timeout: Duration) -> Result<ConsumerRecords> {
206 let (reply_tx, reply_rx) = oneshot::channel();
207 self.application_event_handler
208 .send(
209 ConsumerRuntimeEvent::Poll {
210 timeout,
211 reply: reply_tx,
212 },
213 ConsumerError::ThreadStoppedBefore { operation: "poll" },
214 )
215 .await?;
216 reply_rx
217 .await
218 .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "poll" })?
219 }
220
221 #[instrument(
222 name = "consumer.assign",
223 level = "debug",
224 skip(self, partitions),
225 fields(partition_count = partitions.len())
226 )]
227 pub async fn assign(&self, partitions: Vec<TopicPartition>) -> Result<()> {
229 let (reply_tx, reply_rx) = oneshot::channel();
230 self.application_event_handler
231 .send(
232 ConsumerRuntimeEvent::Assign {
233 partitions,
234 reply: reply_tx,
235 },
236 ConsumerError::ThreadStoppedBefore {
237 operation: "assign",
238 },
239 )
240 .await?;
241 reply_rx
242 .await
243 .map_err(|_| ConsumerError::ThreadStoppedDuring {
244 operation: "assign",
245 })?
246 }
247
248 #[instrument(
249 name = "consumer.seek",
250 level = "debug",
251 skip(self, partition),
252 fields(topic = %partition.topic, partition_id = partition.partition, offset)
253 )]
254 pub async fn seek(&self, partition: TopicPartition, offset: i64) -> Result<()> {
256 let (reply_tx, reply_rx) = oneshot::channel();
257 self.application_event_handler
258 .send(
259 ConsumerRuntimeEvent::Seek {
260 partition,
261 offset,
262 reply: reply_tx,
263 },
264 ConsumerError::ThreadStoppedBefore { operation: "seek" },
265 )
266 .await?;
267 reply_rx
268 .await
269 .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "seek" })?
270 }
271
272 #[instrument(
273 name = "consumer.seek_to_beginning",
274 level = "debug",
275 skip(self, partitions),
276 fields(partition_count = partitions.len())
277 )]
278 pub async fn seek_to_beginning(&self, partitions: Vec<TopicPartition>) -> Result<()> {
280 let (reply_tx, reply_rx) = oneshot::channel();
281 self.application_event_handler
282 .send(
283 ConsumerRuntimeEvent::SeekToBeginning {
284 partitions,
285 reply: reply_tx,
286 },
287 ConsumerError::ThreadStoppedBefore {
288 operation: "seek_to_beginning",
289 },
290 )
291 .await?;
292 reply_rx
293 .await
294 .map_err(|_| ConsumerError::ThreadStoppedDuring {
295 operation: "seek_to_beginning",
296 })?
297 }
298
299 #[instrument(
300 name = "consumer.seek_to_end",
301 level = "debug",
302 skip(self, partitions),
303 fields(partition_count = partitions.len())
304 )]
305 pub async fn seek_to_end(&self, partitions: Vec<TopicPartition>) -> Result<()> {
307 let (reply_tx, reply_rx) = oneshot::channel();
308 self.application_event_handler
309 .send(
310 ConsumerRuntimeEvent::SeekToEnd {
311 partitions,
312 reply: reply_tx,
313 },
314 ConsumerError::ThreadStoppedBefore {
315 operation: "seek_to_end",
316 },
317 )
318 .await?;
319 reply_rx
320 .await
321 .map_err(|_| ConsumerError::ThreadStoppedDuring {
322 operation: "seek_to_end",
323 })?
324 }
325
326 #[instrument(
327 name = "consumer.seek_to_timestamp",
328 level = "debug",
329 skip(self, partitions),
330 fields(partition_count = partitions.len())
331 )]
332 pub async fn seek_to_timestamp(&self, partitions: Vec<TopicPartitionTimestamp>) -> Result<()> {
334 let (reply_tx, reply_rx) = oneshot::channel();
335 self.application_event_handler
336 .send(
337 ConsumerRuntimeEvent::SeekToTimestamp {
338 partitions,
339 reply: reply_tx,
340 },
341 ConsumerError::ThreadStoppedBefore {
342 operation: "seek_to_timestamp",
343 },
344 )
345 .await?;
346 reply_rx
347 .await
348 .map_err(|_| ConsumerError::ThreadStoppedDuring {
349 operation: "seek_to_timestamp",
350 })?
351 }
352
353 #[instrument(
354 name = "consumer.position",
355 level = "debug",
356 skip(self, partition),
357 fields(topic = %partition.topic, partition_id = partition.partition)
358 )]
359 pub async fn position(&self, partition: TopicPartition) -> Result<i64> {
361 let (reply_tx, reply_rx) = oneshot::channel();
362 self.application_event_handler
363 .send(
364 ConsumerRuntimeEvent::Position {
365 partition,
366 reply: reply_tx,
367 },
368 ConsumerError::ThreadStoppedBefore {
369 operation: "position",
370 },
371 )
372 .await?;
373 reply_rx
374 .await
375 .map_err(|_| ConsumerError::ThreadStoppedDuring {
376 operation: "position",
377 })?
378 }
379
380 #[instrument(
381 name = "consumer.pause",
382 level = "debug",
383 skip(self, partitions),
384 fields(partition_count = partitions.len())
385 )]
386 pub async fn pause(&self, partitions: Vec<TopicPartition>) -> Result<()> {
388 let (reply_tx, reply_rx) = oneshot::channel();
389 self.application_event_handler
390 .send(
391 ConsumerRuntimeEvent::Pause {
392 partitions,
393 reply: reply_tx,
394 },
395 ConsumerError::ThreadStoppedBefore { operation: "pause" },
396 )
397 .await?;
398 reply_rx
399 .await
400 .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "pause" })?
401 }
402
403 #[instrument(
404 name = "consumer.resume",
405 level = "debug",
406 skip(self, partitions),
407 fields(partition_count = partitions.len())
408 )]
409 pub async fn resume(&self, partitions: Vec<TopicPartition>) -> Result<()> {
411 let (reply_tx, reply_rx) = oneshot::channel();
412 self.application_event_handler
413 .send(
414 ConsumerRuntimeEvent::Resume {
415 partitions,
416 reply: reply_tx,
417 },
418 ConsumerError::ThreadStoppedBefore {
419 operation: "resume",
420 },
421 )
422 .await?;
423 reply_rx
424 .await
425 .map_err(|_| ConsumerError::ThreadStoppedDuring {
426 operation: "resume",
427 })?
428 }
429
430 #[instrument(name = "consumer.group_metadata", level = "debug", skip(self))]
431 pub async fn group_metadata(&self) -> Result<ConsumerGroupMetadata> {
433 let (reply_tx, reply_rx) = oneshot::channel();
434 self.application_event_handler
435 .send(
436 ConsumerRuntimeEvent::GroupMetadata { reply: reply_tx },
437 ConsumerError::ThreadStoppedBefore {
438 operation: "group_metadata",
439 },
440 )
441 .await?;
442 reply_rx
443 .await
444 .map_err(|_| ConsumerError::ThreadStoppedDuring {
445 operation: "group_metadata",
446 })?
447 }
448
449 #[instrument(name = "consumer.assignment", level = "debug", skip(self))]
450 pub async fn assignment(&self) -> Result<BTreeSet<TopicPartition>> {
452 let (reply_tx, reply_rx) = oneshot::channel();
453 self.application_event_handler
454 .send(
455 ConsumerRuntimeEvent::Assignment { reply: reply_tx },
456 ConsumerError::ThreadStoppedBefore {
457 operation: "assignment",
458 },
459 )
460 .await?;
461 reply_rx
462 .await
463 .map_err(|_| ConsumerError::ThreadStoppedDuring {
464 operation: "assignment",
465 })?
466 }
467
468 #[instrument(
469 name = "consumer.committed_offsets",
470 level = "debug",
471 skip(self, partitions),
472 fields(partition_count = partitions.len())
473 )]
474 pub async fn committed(
476 &self,
477 partitions: Vec<TopicPartition>,
478 ) -> Result<Vec<TopicPartitionOffset>> {
479 let (reply_tx, reply_rx) = oneshot::channel();
480 self.application_event_handler
481 .send(
482 ConsumerRuntimeEvent::Committed {
483 partitions,
484 reply: reply_tx,
485 },
486 ConsumerError::ThreadStoppedBefore {
487 operation: "committed",
488 },
489 )
490 .await?;
491 reply_rx
492 .await
493 .map_err(|_| ConsumerError::ThreadStoppedDuring {
494 operation: "committed",
495 })?
496 }
497
498 #[instrument(
499 name = "consumer.beginning_offsets",
500 level = "debug",
501 skip(self, partitions),
502 fields(partition_count = partitions.len())
503 )]
504 pub async fn beginning_offsets(
506 &self,
507 partitions: Vec<TopicPartition>,
508 ) -> Result<Vec<TopicPartitionOffset>> {
509 let (reply_tx, reply_rx) = oneshot::channel();
510 self.application_event_handler
511 .send(
512 ConsumerRuntimeEvent::BeginningOffsets {
513 partitions,
514 reply: reply_tx,
515 },
516 ConsumerError::ThreadStoppedBefore {
517 operation: "beginning_offsets",
518 },
519 )
520 .await?;
521 reply_rx
522 .await
523 .map_err(|_| ConsumerError::ThreadStoppedDuring {
524 operation: "beginning_offsets",
525 })?
526 }
527
528 #[instrument(
529 name = "consumer.end_offsets",
530 level = "debug",
531 skip(self, partitions),
532 fields(partition_count = partitions.len())
533 )]
534 pub async fn end_offsets(
536 &self,
537 partitions: Vec<TopicPartition>,
538 ) -> Result<Vec<TopicPartitionOffset>> {
539 let (reply_tx, reply_rx) = oneshot::channel();
540 self.application_event_handler
541 .send(
542 ConsumerRuntimeEvent::EndOffsets {
543 partitions,
544 reply: reply_tx,
545 },
546 ConsumerError::ThreadStoppedBefore {
547 operation: "end_offsets",
548 },
549 )
550 .await?;
551 reply_rx
552 .await
553 .map_err(|_| ConsumerError::ThreadStoppedDuring {
554 operation: "end_offsets",
555 })?
556 }
557
558 #[instrument(
559 name = "consumer.offsets_for_times",
560 level = "debug",
561 skip(self, partitions),
562 fields(partition_count = partitions.len())
563 )]
564 pub async fn offsets_for_times(
566 &self,
567 partitions: Vec<TopicPartitionTimestamp>,
568 ) -> Result<Vec<TopicPartitionOffsetAndTimestamp>> {
569 let (reply_tx, reply_rx) = oneshot::channel();
570 self.application_event_handler
571 .send(
572 ConsumerRuntimeEvent::OffsetsForTimes {
573 partitions,
574 reply: reply_tx,
575 },
576 ConsumerError::ThreadStoppedBefore {
577 operation: "offsets_for_times",
578 },
579 )
580 .await?;
581 reply_rx
582 .await
583 .map_err(|_| ConsumerError::ThreadStoppedDuring {
584 operation: "offsets_for_times",
585 })?
586 }
587
588 #[instrument(
589 name = "consumer.partitions_for",
590 level = "debug",
591 skip(self),
592 fields(topic = %topic)
593 )]
594 pub async fn partitions_for(&self, topic: String) -> Result<Vec<TopicPartitionInfo>> {
596 let (reply_tx, reply_rx) = oneshot::channel();
597 self.application_event_handler
598 .send(
599 ConsumerRuntimeEvent::PartitionsFor {
600 topic,
601 reply: reply_tx,
602 },
603 ConsumerError::ThreadStoppedBefore {
604 operation: "partitions_for",
605 },
606 )
607 .await?;
608 reply_rx
609 .await
610 .map_err(|_| ConsumerError::ThreadStoppedDuring {
611 operation: "partitions_for",
612 })?
613 }
614
615 #[instrument(name = "consumer.list_topics", level = "debug", skip(self))]
616 pub async fn list_topics(&self) -> Result<Vec<String>> {
618 let (reply_tx, reply_rx) = oneshot::channel();
619 self.application_event_handler
620 .send(
621 ConsumerRuntimeEvent::ListTopics { reply: reply_tx },
622 ConsumerError::ThreadStoppedBefore {
623 operation: "list_topics",
624 },
625 )
626 .await?;
627 reply_rx
628 .await
629 .map_err(|_| ConsumerError::ThreadStoppedDuring {
630 operation: "list_topics",
631 })?
632 }
633
634 pub async fn commit(&self, records: &ConsumerRecords) -> Result<()> {
636 self.commit_offsets(records.commit_offsets()).await
637 }
638
639 #[instrument(
640 name = "consumer.commit_offsets",
641 level = "debug",
642 skip(self, offsets),
643 fields(offset_count = offsets.len())
644 )]
645 pub async fn commit_offsets(&self, offsets: Vec<CommitOffset>) -> Result<()> {
647 if offsets.is_empty() {
648 return Ok(());
649 }
650
651 let (reply_tx, reply_rx) = oneshot::channel();
652 self.application_event_handler
653 .send(
654 ConsumerRuntimeEvent::Commit {
655 offsets,
656 reply: reply_tx,
657 },
658 ConsumerError::ThreadStoppedBefore {
659 operation: "commit",
660 },
661 )
662 .await?;
663 reply_rx
664 .await
665 .map_err(|_| ConsumerError::ThreadStoppedDuring {
666 operation: "commit",
667 })?
668 }
669
670 #[instrument(name = "consumer.wakeup", level = "debug", skip(self))]
671 pub async fn wakeup(&self) -> Result<()> {
673 self.application_event_handler
674 .send(
675 ConsumerRuntimeEvent::Wakeup,
676 ConsumerError::ThreadStoppedBefore {
677 operation: "wakeup",
678 },
679 )
680 .await
681 }
682
683 #[instrument(name = "consumer.shutdown", level = "debug", skip(self))]
684 pub async fn shutdown(self) -> Result<()> {
686 let (reply_tx, reply_rx) = oneshot::channel();
687 self.application_event_handler
688 .send(
689 ConsumerRuntimeEvent::Shutdown { reply: reply_tx },
690 ConsumerError::ThreadStoppedBefore {
691 operation: "shutdown",
692 },
693 )
694 .await?;
695
696 let result = reply_rx
697 .await
698 .map_err(|_| ConsumerError::ThreadStoppedDuring {
699 operation: "shutdown",
700 })?;
701 self.join.await.map_err(ConsumerError::Join)?;
702 result
703 }
704
705 #[instrument(name = "consumer.warm_up", level = "trace", skip(self))]
706 async fn warm_up(&self) -> Result<()> {
707 let (reply_tx, reply_rx) = oneshot::channel();
708 self.application_event_handler
709 .send(
710 ConsumerRuntimeEvent::WarmUp { reply: reply_tx },
711 ConsumerError::ThreadStoppedBefore {
712 operation: "startup",
713 },
714 )
715 .await?;
716 reply_rx
717 .await
718 .map_err(|_| ConsumerError::ThreadStoppedDuring {
719 operation: "startup",
720 })?
721 }
722}
723
724struct ConsumerApplicationEventHandler {
725 tx: mpsc::Sender<ConsumerRuntimeEvent>,
726}
727
728impl ConsumerApplicationEventHandler {
729 fn new(tx: mpsc::Sender<ConsumerRuntimeEvent>) -> Self {
730 Self { tx }
731 }
732
733 async fn send(&self, event: ConsumerRuntimeEvent, stopped_error: ConsumerError) -> Result<()> {
734 self.tx.send(event).await.map_err(|_| stopped_error.into())
735 }
736}
737
738enum ConsumerRuntimeEvent {
739 WarmUp {
740 reply: oneshot::Sender<Result<()>>,
741 },
742 Subscribe {
743 topics: Vec<String>,
744 reply: oneshot::Sender<Result<()>>,
745 },
746 SubscribePattern {
747 pattern: SubscriptionPattern,
748 reply: oneshot::Sender<Result<()>>,
749 },
750 SubscribeRegex {
751 pattern: String,
752 reply: oneshot::Sender<Result<()>>,
753 },
754 Unsubscribe {
755 reply: oneshot::Sender<Result<()>>,
756 },
757 Assign {
758 partitions: Vec<TopicPartition>,
759 reply: oneshot::Sender<Result<()>>,
760 },
761 Poll {
762 timeout: Duration,
763 reply: oneshot::Sender<Result<ConsumerRecords>>,
764 },
765 Seek {
766 partition: TopicPartition,
767 offset: i64,
768 reply: oneshot::Sender<Result<()>>,
769 },
770 SeekToBeginning {
771 partitions: Vec<TopicPartition>,
772 reply: oneshot::Sender<Result<()>>,
773 },
774 SeekToEnd {
775 partitions: Vec<TopicPartition>,
776 reply: oneshot::Sender<Result<()>>,
777 },
778 SeekToTimestamp {
779 partitions: Vec<TopicPartitionTimestamp>,
780 reply: oneshot::Sender<Result<()>>,
781 },
782 Position {
783 partition: TopicPartition,
784 reply: oneshot::Sender<Result<i64>>,
785 },
786 Pause {
787 partitions: Vec<TopicPartition>,
788 reply: oneshot::Sender<Result<()>>,
789 },
790 Resume {
791 partitions: Vec<TopicPartition>,
792 reply: oneshot::Sender<Result<()>>,
793 },
794 GroupMetadata {
795 reply: oneshot::Sender<Result<ConsumerGroupMetadata>>,
796 },
797 Assignment {
798 reply: oneshot::Sender<Result<BTreeSet<TopicPartition>>>,
799 },
800 Committed {
801 partitions: Vec<TopicPartition>,
802 reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
803 },
804 BeginningOffsets {
805 partitions: Vec<TopicPartition>,
806 reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
807 },
808 EndOffsets {
809 partitions: Vec<TopicPartition>,
810 reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
811 },
812 OffsetsForTimes {
813 partitions: Vec<TopicPartitionTimestamp>,
814 reply: oneshot::Sender<Result<Vec<TopicPartitionOffsetAndTimestamp>>>,
815 },
816 PartitionsFor {
817 topic: String,
818 reply: oneshot::Sender<Result<Vec<TopicPartitionInfo>>>,
819 },
820 ListTopics {
821 reply: oneshot::Sender<Result<Vec<String>>>,
822 },
823 Commit {
824 offsets: Vec<CommitOffset>,
825 reply: oneshot::Sender<Result<()>>,
826 },
827 Wakeup,
828 Shutdown {
829 reply: oneshot::Sender<Result<()>>,
830 },
831}