1mod protocol;
21mod runtime;
22mod scheduler;
23mod share;
24mod state;
25mod util;
26
27use std::collections::BTreeSet;
28use std::time::Duration;
29
30use tokio::sync::{mpsc, oneshot};
31use tokio::task::JoinHandle;
32use tracing::{debug, instrument};
33
34use crate::config::ConsumerConfig;
35use crate::types::{
36 CommitOffset, ConsumerGroupMetadata, ConsumerRecords, SubscriptionPattern, TopicPartition,
37 TopicPartitionInfo, TopicPartitionOffset, TopicPartitionOffsetAndTimestamp,
38 TopicPartitionTimestamp,
39};
40use crate::{CancellationToken, ConsumerError, Result};
41
42use runtime::ConsumerRuntime;
43
44pub struct KafkaConsumer {
46 consumer_runtime: ConsumerRuntimeHandle,
47 join: JoinHandle<()>,
48 default_poll_timeout: Duration,
49}
50
51pub use share::{
52 AcknowledgeType, AcknowledgementCommitCallback, KafkaShareConsumer, ShareAcknowledgementCommit,
53 ShareAcquireMode, ShareConsumerOptions, ShareRecord, ShareRecords,
54};
55
56impl KafkaConsumer {
57 #[instrument(
58 name = "consumer.connect",
59 level = "debug",
60 skip(config),
61 fields(
62 bootstrap_server_count = config.bootstrap_servers.len(),
63 client_id = %config.client_id,
64 group_id = %config.group_id
65 )
66 )]
67 pub async fn connect(config: ConsumerConfig) -> Result<Self> {
69 let (tx, rx) = mpsc::channel(64);
70 let runtime = ConsumerRuntime::new(config);
71 let join = tokio::spawn(async move {
72 runtime.run(rx).await;
73 });
74
75 let consumer = Self {
76 consumer_runtime: ConsumerRuntimeHandle::new(tx),
77 join,
78 default_poll_timeout: Duration::from_millis(100),
79 };
80 if let Err(error) = consumer.warm_up().await {
81 consumer.join.abort();
82 return Err(error);
83 }
84 debug!("consumer connected");
85 Ok(consumer)
86 }
87
88 pub fn with_default_poll_timeout(mut self, timeout: Duration) -> Self {
90 self.default_poll_timeout = timeout;
91 self
92 }
93
94 #[instrument(
95 name = "consumer.subscribe",
96 level = "debug",
97 skip(self, topics),
98 fields(topic_count = topics.len())
99 )]
100 pub async fn subscribe(&self, topics: Vec<String>) -> Result<()> {
102 let (reply_tx, reply_rx) = oneshot::channel();
103 self.consumer_runtime
104 .send(
105 ConsumerRuntimeEvent::Subscribe {
106 topics,
107 cancellation: None,
108 reply: reply_tx,
109 },
110 ConsumerError::ThreadStoppedBefore {
111 operation: "subscribe",
112 },
113 )
114 .await?;
115 reply_rx
116 .await
117 .map_err(|_| ConsumerError::ThreadStoppedDuring {
118 operation: "subscribe",
119 })?
120 }
121
122 #[instrument(
123 name = "consumer.subscribe_pattern",
124 level = "debug",
125 skip(self, pattern),
126 fields(pattern = %pattern.pattern())
127 )]
128 pub async fn subscribe_pattern(&self, pattern: SubscriptionPattern) -> Result<()> {
130 let (reply_tx, reply_rx) = oneshot::channel();
131 self.consumer_runtime
132 .send(
133 ConsumerRuntimeEvent::SubscribePattern {
134 pattern,
135 cancellation: None,
136 reply: reply_tx,
137 },
138 ConsumerError::ThreadStoppedBefore {
139 operation: "subscribe_pattern",
140 },
141 )
142 .await?;
143 reply_rx
144 .await
145 .map_err(|_| ConsumerError::ThreadStoppedDuring {
146 operation: "subscribe_pattern",
147 })?
148 }
149
150 #[instrument(
151 name = "consumer.subscribe_regex",
152 level = "debug",
153 skip(self, pattern),
154 fields(pattern = %pattern)
155 )]
156 pub async fn subscribe_regex(&self, pattern: String) -> Result<()> {
158 let (reply_tx, reply_rx) = oneshot::channel();
159 self.consumer_runtime
160 .send(
161 ConsumerRuntimeEvent::SubscribeRegex {
162 pattern,
163 cancellation: None,
164 reply: reply_tx,
165 },
166 ConsumerError::ThreadStoppedBefore {
167 operation: "subscribe_regex",
168 },
169 )
170 .await?;
171 reply_rx
172 .await
173 .map_err(|_| ConsumerError::ThreadStoppedDuring {
174 operation: "subscribe_regex",
175 })?
176 }
177
178 #[instrument(name = "consumer.unsubscribe", level = "debug", skip(self))]
179 pub async fn unsubscribe(&self) -> Result<()> {
181 let (reply_tx, reply_rx) = oneshot::channel();
182 self.consumer_runtime
183 .send(
184 ConsumerRuntimeEvent::Unsubscribe {
185 cancellation: None,
186 reply: reply_tx,
187 },
188 ConsumerError::ThreadStoppedBefore {
189 operation: "unsubscribe",
190 },
191 )
192 .await?;
193 reply_rx
194 .await
195 .map_err(|_| ConsumerError::ThreadStoppedDuring {
196 operation: "unsubscribe",
197 })?
198 }
199
200 pub async fn poll(&self) -> Result<ConsumerRecords> {
202 self.poll_for(self.default_poll_timeout).await
203 }
204
205 #[instrument(
206 name = "consumer.poll",
207 level = "debug",
208 skip(self),
209 fields(timeout_ms = timeout.as_millis())
210 )]
211 pub async fn poll_for(&self, timeout: Duration) -> Result<ConsumerRecords> {
213 let (reply_tx, reply_rx) = oneshot::channel();
214 self.consumer_runtime
215 .send(
216 ConsumerRuntimeEvent::Poll {
217 timeout,
218 cancellation: None,
219 reply: reply_tx,
220 },
221 ConsumerError::ThreadStoppedBefore { operation: "poll" },
222 )
223 .await?;
224 reply_rx
225 .await
226 .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "poll" })?
227 }
228
229 #[instrument(
230 name = "consumer.assign",
231 level = "debug",
232 skip(self, partitions),
233 fields(partition_count = partitions.len())
234 )]
235 pub async fn assign(&self, partitions: Vec<TopicPartition>) -> Result<()> {
237 let (reply_tx, reply_rx) = oneshot::channel();
238 self.consumer_runtime
239 .send(
240 ConsumerRuntimeEvent::Assign {
241 partitions,
242 cancellation: None,
243 reply: reply_tx,
244 },
245 ConsumerError::ThreadStoppedBefore {
246 operation: "assign",
247 },
248 )
249 .await?;
250 reply_rx
251 .await
252 .map_err(|_| ConsumerError::ThreadStoppedDuring {
253 operation: "assign",
254 })?
255 }
256
257 #[instrument(
258 name = "consumer.seek",
259 level = "debug",
260 skip(self, partition),
261 fields(topic = %partition.topic, partition_id = partition.partition, offset)
262 )]
263 pub async fn seek(&self, partition: TopicPartition, offset: i64) -> Result<()> {
265 let (reply_tx, reply_rx) = oneshot::channel();
266 self.consumer_runtime
267 .send(
268 ConsumerRuntimeEvent::Seek {
269 partition,
270 offset,
271 cancellation: None,
272 reply: reply_tx,
273 },
274 ConsumerError::ThreadStoppedBefore { operation: "seek" },
275 )
276 .await?;
277 reply_rx
278 .await
279 .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "seek" })?
280 }
281
282 #[instrument(
283 name = "consumer.seek_to_beginning",
284 level = "debug",
285 skip(self, partitions),
286 fields(partition_count = partitions.len())
287 )]
288 pub async fn seek_to_beginning(&self, partitions: Vec<TopicPartition>) -> Result<()> {
290 let (reply_tx, reply_rx) = oneshot::channel();
291 self.consumer_runtime
292 .send(
293 ConsumerRuntimeEvent::SeekToBeginning {
294 partitions,
295 cancellation: None,
296 reply: reply_tx,
297 },
298 ConsumerError::ThreadStoppedBefore {
299 operation: "seek_to_beginning",
300 },
301 )
302 .await?;
303 reply_rx
304 .await
305 .map_err(|_| ConsumerError::ThreadStoppedDuring {
306 operation: "seek_to_beginning",
307 })?
308 }
309
310 #[instrument(
311 name = "consumer.seek_to_end",
312 level = "debug",
313 skip(self, partitions),
314 fields(partition_count = partitions.len())
315 )]
316 pub async fn seek_to_end(&self, partitions: Vec<TopicPartition>) -> Result<()> {
318 let (reply_tx, reply_rx) = oneshot::channel();
319 self.consumer_runtime
320 .send(
321 ConsumerRuntimeEvent::SeekToEnd {
322 partitions,
323 cancellation: None,
324 reply: reply_tx,
325 },
326 ConsumerError::ThreadStoppedBefore {
327 operation: "seek_to_end",
328 },
329 )
330 .await?;
331 reply_rx
332 .await
333 .map_err(|_| ConsumerError::ThreadStoppedDuring {
334 operation: "seek_to_end",
335 })?
336 }
337
338 #[instrument(
339 name = "consumer.seek_to_timestamp",
340 level = "debug",
341 skip(self, partitions),
342 fields(partition_count = partitions.len())
343 )]
344 pub async fn seek_to_timestamp(&self, partitions: Vec<TopicPartitionTimestamp>) -> Result<()> {
346 let (reply_tx, reply_rx) = oneshot::channel();
347 self.consumer_runtime
348 .send(
349 ConsumerRuntimeEvent::SeekToTimestamp {
350 partitions,
351 cancellation: None,
352 reply: reply_tx,
353 },
354 ConsumerError::ThreadStoppedBefore {
355 operation: "seek_to_timestamp",
356 },
357 )
358 .await?;
359 reply_rx
360 .await
361 .map_err(|_| ConsumerError::ThreadStoppedDuring {
362 operation: "seek_to_timestamp",
363 })?
364 }
365
366 #[instrument(
367 name = "consumer.position",
368 level = "debug",
369 skip(self, partition),
370 fields(topic = %partition.topic, partition_id = partition.partition)
371 )]
372 pub async fn position(&self, partition: TopicPartition) -> Result<i64> {
374 let (reply_tx, reply_rx) = oneshot::channel();
375 self.consumer_runtime
376 .send(
377 ConsumerRuntimeEvent::Position {
378 partition,
379 cancellation: None,
380 reply: reply_tx,
381 },
382 ConsumerError::ThreadStoppedBefore {
383 operation: "position",
384 },
385 )
386 .await?;
387 reply_rx
388 .await
389 .map_err(|_| ConsumerError::ThreadStoppedDuring {
390 operation: "position",
391 })?
392 }
393
394 #[instrument(
395 name = "consumer.pause",
396 level = "debug",
397 skip(self, partitions),
398 fields(partition_count = partitions.len())
399 )]
400 pub async fn pause(&self, partitions: Vec<TopicPartition>) -> Result<()> {
402 let (reply_tx, reply_rx) = oneshot::channel();
403 self.consumer_runtime
404 .send(
405 ConsumerRuntimeEvent::Pause {
406 partitions,
407 cancellation: None,
408 reply: reply_tx,
409 },
410 ConsumerError::ThreadStoppedBefore { operation: "pause" },
411 )
412 .await?;
413 reply_rx
414 .await
415 .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "pause" })?
416 }
417
418 #[instrument(
419 name = "consumer.resume",
420 level = "debug",
421 skip(self, partitions),
422 fields(partition_count = partitions.len())
423 )]
424 pub async fn resume(&self, partitions: Vec<TopicPartition>) -> Result<()> {
426 let (reply_tx, reply_rx) = oneshot::channel();
427 self.consumer_runtime
428 .send(
429 ConsumerRuntimeEvent::Resume {
430 partitions,
431 cancellation: None,
432 reply: reply_tx,
433 },
434 ConsumerError::ThreadStoppedBefore {
435 operation: "resume",
436 },
437 )
438 .await?;
439 reply_rx
440 .await
441 .map_err(|_| ConsumerError::ThreadStoppedDuring {
442 operation: "resume",
443 })?
444 }
445
446 #[instrument(name = "consumer.group_metadata", level = "debug", skip(self))]
447 pub async fn group_metadata(&self) -> Result<ConsumerGroupMetadata> {
449 let (reply_tx, reply_rx) = oneshot::channel();
450 self.consumer_runtime
451 .send(
452 ConsumerRuntimeEvent::GroupMetadata {
453 cancellation: None,
454 reply: reply_tx,
455 },
456 ConsumerError::ThreadStoppedBefore {
457 operation: "group_metadata",
458 },
459 )
460 .await?;
461 reply_rx
462 .await
463 .map_err(|_| ConsumerError::ThreadStoppedDuring {
464 operation: "group_metadata",
465 })?
466 }
467
468 #[instrument(name = "consumer.assignment", level = "debug", skip(self))]
469 pub async fn assignment(&self) -> Result<BTreeSet<TopicPartition>> {
471 let (reply_tx, reply_rx) = oneshot::channel();
472 self.consumer_runtime
473 .send(
474 ConsumerRuntimeEvent::Assignment {
475 cancellation: None,
476 reply: reply_tx,
477 },
478 ConsumerError::ThreadStoppedBefore {
479 operation: "assignment",
480 },
481 )
482 .await?;
483 reply_rx
484 .await
485 .map_err(|_| ConsumerError::ThreadStoppedDuring {
486 operation: "assignment",
487 })?
488 }
489
490 #[instrument(
491 name = "consumer.committed_offsets",
492 level = "debug",
493 skip(self, partitions),
494 fields(partition_count = partitions.len())
495 )]
496 pub async fn committed(
498 &self,
499 partitions: Vec<TopicPartition>,
500 ) -> Result<Vec<TopicPartitionOffset>> {
501 let (reply_tx, reply_rx) = oneshot::channel();
502 self.consumer_runtime
503 .send(
504 ConsumerRuntimeEvent::Committed {
505 partitions,
506 cancellation: None,
507 reply: reply_tx,
508 },
509 ConsumerError::ThreadStoppedBefore {
510 operation: "committed",
511 },
512 )
513 .await?;
514 reply_rx
515 .await
516 .map_err(|_| ConsumerError::ThreadStoppedDuring {
517 operation: "committed",
518 })?
519 }
520
521 #[instrument(
522 name = "consumer.beginning_offsets",
523 level = "debug",
524 skip(self, partitions),
525 fields(partition_count = partitions.len())
526 )]
527 pub async fn beginning_offsets(
529 &self,
530 partitions: Vec<TopicPartition>,
531 ) -> Result<Vec<TopicPartitionOffset>> {
532 let (reply_tx, reply_rx) = oneshot::channel();
533 self.consumer_runtime
534 .send(
535 ConsumerRuntimeEvent::BeginningOffsets {
536 partitions,
537 cancellation: None,
538 reply: reply_tx,
539 },
540 ConsumerError::ThreadStoppedBefore {
541 operation: "beginning_offsets",
542 },
543 )
544 .await?;
545 reply_rx
546 .await
547 .map_err(|_| ConsumerError::ThreadStoppedDuring {
548 operation: "beginning_offsets",
549 })?
550 }
551
552 #[instrument(
553 name = "consumer.end_offsets",
554 level = "debug",
555 skip(self, partitions),
556 fields(partition_count = partitions.len())
557 )]
558 pub async fn end_offsets(
560 &self,
561 partitions: Vec<TopicPartition>,
562 ) -> Result<Vec<TopicPartitionOffset>> {
563 let (reply_tx, reply_rx) = oneshot::channel();
564 self.consumer_runtime
565 .send(
566 ConsumerRuntimeEvent::EndOffsets {
567 partitions,
568 cancellation: None,
569 reply: reply_tx,
570 },
571 ConsumerError::ThreadStoppedBefore {
572 operation: "end_offsets",
573 },
574 )
575 .await?;
576 reply_rx
577 .await
578 .map_err(|_| ConsumerError::ThreadStoppedDuring {
579 operation: "end_offsets",
580 })?
581 }
582
583 #[instrument(
584 name = "consumer.offsets_for_times",
585 level = "debug",
586 skip(self, partitions),
587 fields(partition_count = partitions.len())
588 )]
589 pub async fn offsets_for_times(
591 &self,
592 partitions: Vec<TopicPartitionTimestamp>,
593 ) -> Result<Vec<TopicPartitionOffsetAndTimestamp>> {
594 let (reply_tx, reply_rx) = oneshot::channel();
595 self.consumer_runtime
596 .send(
597 ConsumerRuntimeEvent::OffsetsForTimes {
598 partitions,
599 cancellation: None,
600 reply: reply_tx,
601 },
602 ConsumerError::ThreadStoppedBefore {
603 operation: "offsets_for_times",
604 },
605 )
606 .await?;
607 reply_rx
608 .await
609 .map_err(|_| ConsumerError::ThreadStoppedDuring {
610 operation: "offsets_for_times",
611 })?
612 }
613
614 #[instrument(
615 name = "consumer.partitions_for",
616 level = "debug",
617 skip(self),
618 fields(topic = %topic)
619 )]
620 pub async fn partitions_for(&self, topic: String) -> Result<Vec<TopicPartitionInfo>> {
622 let (reply_tx, reply_rx) = oneshot::channel();
623 self.consumer_runtime
624 .send(
625 ConsumerRuntimeEvent::PartitionsFor {
626 topic,
627 cancellation: None,
628 reply: reply_tx,
629 },
630 ConsumerError::ThreadStoppedBefore {
631 operation: "partitions_for",
632 },
633 )
634 .await?;
635 reply_rx
636 .await
637 .map_err(|_| ConsumerError::ThreadStoppedDuring {
638 operation: "partitions_for",
639 })?
640 }
641
642 #[instrument(name = "consumer.list_topics", level = "debug", skip(self))]
643 pub async fn list_topics(&self) -> Result<Vec<String>> {
645 let (reply_tx, reply_rx) = oneshot::channel();
646 self.consumer_runtime
647 .send(
648 ConsumerRuntimeEvent::ListTopics {
649 cancellation: None,
650 reply: reply_tx,
651 },
652 ConsumerError::ThreadStoppedBefore {
653 operation: "list_topics",
654 },
655 )
656 .await?;
657 reply_rx
658 .await
659 .map_err(|_| ConsumerError::ThreadStoppedDuring {
660 operation: "list_topics",
661 })?
662 }
663
664 pub async fn commit(&self, records: &ConsumerRecords) -> Result<()> {
666 self.commit_offsets(records.commit_offsets()).await
667 }
668
669 #[instrument(
670 name = "consumer.commit_offsets",
671 level = "debug",
672 skip(self, offsets),
673 fields(offset_count = offsets.len())
674 )]
675 pub async fn commit_offsets(&self, offsets: Vec<CommitOffset>) -> Result<()> {
677 if offsets.is_empty() {
678 return Ok(());
679 }
680
681 let (reply_tx, reply_rx) = oneshot::channel();
682 self.consumer_runtime
683 .send(
684 ConsumerRuntimeEvent::Commit {
685 offsets,
686 cancellation: None,
687 reply: reply_tx,
688 },
689 ConsumerError::ThreadStoppedBefore {
690 operation: "commit",
691 },
692 )
693 .await?;
694 reply_rx
695 .await
696 .map_err(|_| ConsumerError::ThreadStoppedDuring {
697 operation: "commit",
698 })?
699 }
700
701 #[instrument(name = "consumer.wakeup", level = "debug", skip(self))]
702 pub async fn wakeup(&self) -> Result<()> {
704 self.consumer_runtime
705 .send(
706 ConsumerRuntimeEvent::Wakeup,
707 ConsumerError::ThreadStoppedBefore {
708 operation: "wakeup",
709 },
710 )
711 .await
712 }
713
714 #[instrument(name = "consumer.shutdown", level = "debug", skip(self))]
715 pub async fn shutdown(self) -> Result<()> {
717 let (reply_tx, reply_rx) = oneshot::channel();
718 self.consumer_runtime
719 .send(
720 ConsumerRuntimeEvent::Shutdown { reply: reply_tx },
721 ConsumerError::ThreadStoppedBefore {
722 operation: "shutdown",
723 },
724 )
725 .await?;
726
727 let result = reply_rx
728 .await
729 .map_err(|_| ConsumerError::ThreadStoppedDuring {
730 operation: "shutdown",
731 })?;
732 self.join.await.map_err(ConsumerError::Join)?;
733 result
734 }
735
736 #[instrument(name = "consumer.warm_up", level = "trace", skip(self))]
737 async fn warm_up(&self) -> Result<()> {
738 let (reply_tx, reply_rx) = oneshot::channel();
739 self.consumer_runtime
740 .send(
741 ConsumerRuntimeEvent::WarmUp {
742 cancellation: None,
743 reply: reply_tx,
744 },
745 ConsumerError::ThreadStoppedBefore {
746 operation: "startup",
747 },
748 )
749 .await?;
750 reply_rx
751 .await
752 .map_err(|_| ConsumerError::ThreadStoppedDuring {
753 operation: "startup",
754 })?
755 }
756}
757
758struct ConsumerRuntimeHandle {
759 tx: mpsc::Sender<ConsumerRuntimeEvent>,
760}
761
762impl ConsumerRuntimeHandle {
763 fn new(tx: mpsc::Sender<ConsumerRuntimeEvent>) -> Self {
764 Self { tx }
765 }
766
767 async fn send(&self, event: ConsumerRuntimeEvent, stopped_error: ConsumerError) -> Result<()> {
768 self.tx.send(event).await.map_err(|_| stopped_error.into())
769 }
770}
771
772enum ConsumerRuntimeEvent {
773 WarmUp {
774 cancellation: Option<CancellationToken>,
775 reply: oneshot::Sender<Result<()>>,
776 },
777 Subscribe {
778 topics: Vec<String>,
779 cancellation: Option<CancellationToken>,
780 reply: oneshot::Sender<Result<()>>,
781 },
782 SubscribePattern {
783 pattern: SubscriptionPattern,
784 cancellation: Option<CancellationToken>,
785 reply: oneshot::Sender<Result<()>>,
786 },
787 SubscribeRegex {
788 pattern: String,
789 cancellation: Option<CancellationToken>,
790 reply: oneshot::Sender<Result<()>>,
791 },
792 Unsubscribe {
793 cancellation: Option<CancellationToken>,
794 reply: oneshot::Sender<Result<()>>,
795 },
796 Assign {
797 partitions: Vec<TopicPartition>,
798 cancellation: Option<CancellationToken>,
799 reply: oneshot::Sender<Result<()>>,
800 },
801 Poll {
802 timeout: Duration,
803 cancellation: Option<CancellationToken>,
804 reply: oneshot::Sender<Result<ConsumerRecords>>,
805 },
806 Seek {
807 partition: TopicPartition,
808 offset: i64,
809 cancellation: Option<CancellationToken>,
810 reply: oneshot::Sender<Result<()>>,
811 },
812 SeekToBeginning {
813 partitions: Vec<TopicPartition>,
814 cancellation: Option<CancellationToken>,
815 reply: oneshot::Sender<Result<()>>,
816 },
817 SeekToEnd {
818 partitions: Vec<TopicPartition>,
819 cancellation: Option<CancellationToken>,
820 reply: oneshot::Sender<Result<()>>,
821 },
822 SeekToTimestamp {
823 partitions: Vec<TopicPartitionTimestamp>,
824 cancellation: Option<CancellationToken>,
825 reply: oneshot::Sender<Result<()>>,
826 },
827 Position {
828 partition: TopicPartition,
829 cancellation: Option<CancellationToken>,
830 reply: oneshot::Sender<Result<i64>>,
831 },
832 Pause {
833 partitions: Vec<TopicPartition>,
834 cancellation: Option<CancellationToken>,
835 reply: oneshot::Sender<Result<()>>,
836 },
837 Resume {
838 partitions: Vec<TopicPartition>,
839 cancellation: Option<CancellationToken>,
840 reply: oneshot::Sender<Result<()>>,
841 },
842 GroupMetadata {
843 cancellation: Option<CancellationToken>,
844 reply: oneshot::Sender<Result<ConsumerGroupMetadata>>,
845 },
846 Assignment {
847 cancellation: Option<CancellationToken>,
848 reply: oneshot::Sender<Result<BTreeSet<TopicPartition>>>,
849 },
850 Committed {
851 partitions: Vec<TopicPartition>,
852 cancellation: Option<CancellationToken>,
853 reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
854 },
855 BeginningOffsets {
856 partitions: Vec<TopicPartition>,
857 cancellation: Option<CancellationToken>,
858 reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
859 },
860 EndOffsets {
861 partitions: Vec<TopicPartition>,
862 cancellation: Option<CancellationToken>,
863 reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
864 },
865 OffsetsForTimes {
866 partitions: Vec<TopicPartitionTimestamp>,
867 cancellation: Option<CancellationToken>,
868 reply: oneshot::Sender<Result<Vec<TopicPartitionOffsetAndTimestamp>>>,
869 },
870 PartitionsFor {
871 topic: String,
872 cancellation: Option<CancellationToken>,
873 reply: oneshot::Sender<Result<Vec<TopicPartitionInfo>>>,
874 },
875 ListTopics {
876 cancellation: Option<CancellationToken>,
877 reply: oneshot::Sender<Result<Vec<String>>>,
878 },
879 Commit {
880 offsets: Vec<CommitOffset>,
881 cancellation: Option<CancellationToken>,
882 reply: oneshot::Sender<Result<()>>,
883 },
884 Wakeup,
885 Shutdown {
886 reply: oneshot::Sender<Result<()>>,
887 },
888}
889
890impl ConsumerRuntimeEvent {
891 fn is_cancelled(&self) -> bool {
892 match self {
893 Self::WarmUp { cancellation, .. }
894 | Self::Subscribe { cancellation, .. }
895 | Self::SubscribePattern { cancellation, .. }
896 | Self::SubscribeRegex { cancellation, .. }
897 | Self::Unsubscribe { cancellation, .. }
898 | Self::Assign { cancellation, .. }
899 | Self::Poll { cancellation, .. }
900 | Self::Seek { cancellation, .. }
901 | Self::SeekToBeginning { cancellation, .. }
902 | Self::SeekToEnd { cancellation, .. }
903 | Self::SeekToTimestamp { cancellation, .. }
904 | Self::Position { cancellation, .. }
905 | Self::Pause { cancellation, .. }
906 | Self::Resume { cancellation, .. }
907 | Self::GroupMetadata { cancellation, .. }
908 | Self::Assignment { cancellation, .. }
909 | Self::Committed { cancellation, .. }
910 | Self::BeginningOffsets { cancellation, .. }
911 | Self::EndOffsets { cancellation, .. }
912 | Self::OffsetsForTimes { cancellation, .. }
913 | Self::PartitionsFor { cancellation, .. }
914 | Self::ListTopics { cancellation, .. }
915 | Self::Commit { cancellation, .. } => cancellation
916 .as_ref()
917 .is_some_and(CancellationToken::is_cancelled),
918 Self::Wakeup | Self::Shutdown { .. } => false,
919 }
920 }
921
922 fn send_cancelled(self) {
923 match self {
924 Self::WarmUp { reply, .. }
925 | Self::Subscribe { reply, .. }
926 | Self::SubscribePattern { reply, .. }
927 | Self::SubscribeRegex { reply, .. }
928 | Self::Unsubscribe { reply, .. }
929 | Self::Assign { reply, .. }
930 | Self::Seek { reply, .. }
931 | Self::SeekToBeginning { reply, .. }
932 | Self::SeekToEnd { reply, .. }
933 | Self::SeekToTimestamp { reply, .. }
934 | Self::Pause { reply, .. }
935 | Self::Resume { reply, .. }
936 | Self::Commit { reply, .. } => {
937 let _ = reply.send(Err(crate::Error::Cancelled));
938 }
939 Self::Poll { reply, .. } => {
940 let _ = reply.send(Err(crate::Error::Cancelled));
941 }
942 Self::Position { reply, .. } => {
943 let _ = reply.send(Err(crate::Error::Cancelled));
944 }
945 Self::GroupMetadata { reply, .. } => {
946 let _ = reply.send(Err(crate::Error::Cancelled));
947 }
948 Self::Assignment { reply, .. } => {
949 let _ = reply.send(Err(crate::Error::Cancelled));
950 }
951 Self::Committed { reply, .. }
952 | Self::BeginningOffsets { reply, .. }
953 | Self::EndOffsets { reply, .. } => {
954 let _ = reply.send(Err(crate::Error::Cancelled));
955 }
956 Self::OffsetsForTimes { reply, .. } => {
957 let _ = reply.send(Err(crate::Error::Cancelled));
958 }
959 Self::PartitionsFor { reply, .. } => {
960 let _ = reply.send(Err(crate::Error::Cancelled));
961 }
962 Self::ListTopics { reply, .. } => {
963 let _ = reply.send(Err(crate::Error::Cancelled));
964 }
965 Self::Wakeup | Self::Shutdown { .. } => {}
966 }
967 }
968
969 fn send_failed(self, message: &str) {
970 let error = || {
971 crate::Error::Consumer(crate::ConsumerError::Fatal {
972 message: message.to_owned(),
973 })
974 };
975 match self {
976 Self::WarmUp { reply, .. }
977 | Self::Subscribe { reply, .. }
978 | Self::SubscribePattern { reply, .. }
979 | Self::SubscribeRegex { reply, .. }
980 | Self::Unsubscribe { reply, .. }
981 | Self::Assign { reply, .. }
982 | Self::Seek { reply, .. }
983 | Self::SeekToBeginning { reply, .. }
984 | Self::SeekToEnd { reply, .. }
985 | Self::SeekToTimestamp { reply, .. }
986 | Self::Pause { reply, .. }
987 | Self::Resume { reply, .. }
988 | Self::Commit { reply, .. } => {
989 let _ = reply.send(Err(error()));
990 }
991 Self::Poll { reply, .. } => {
992 let _ = reply.send(Err(error()));
993 }
994 Self::Position { reply, .. } => {
995 let _ = reply.send(Err(error()));
996 }
997 Self::GroupMetadata { reply, .. } => {
998 let _ = reply.send(Err(error()));
999 }
1000 Self::Assignment { reply, .. } => {
1001 let _ = reply.send(Err(error()));
1002 }
1003 Self::Committed { reply, .. }
1004 | Self::BeginningOffsets { reply, .. }
1005 | Self::EndOffsets { reply, .. } => {
1006 let _ = reply.send(Err(error()));
1007 }
1008 Self::OffsetsForTimes { reply, .. } => {
1009 let _ = reply.send(Err(error()));
1010 }
1011 Self::PartitionsFor { reply, .. } => {
1012 let _ = reply.send(Err(error()));
1013 }
1014 Self::ListTopics { reply, .. } => {
1015 let _ = reply.send(Err(error()));
1016 }
1017 Self::Wakeup | Self::Shutdown { .. } => {}
1018 }
1019 }
1020}