Skip to main content

kafkit_client/consumer/
mod.rs

1//! Group consumer API.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::KafkaClient;
6//!
7//! let consumer = KafkaClient::new("localhost:9092")
8//!     .consumer("orders-reader")
9//!     .with_topic("orders")
10//!     .connect()
11//!     .await?;
12//!
13//! let records = consumer.poll().await?;
14//! consumer.commit(&records).await?;
15//! consumer.shutdown().await?;
16//! # Ok(())
17//! # }
18//! ```
19//!
20mod protocol;
21mod runtime;
22mod scheduler;
23mod share;
24mod state;
25mod util;
26
27use std::collections::BTreeSet;
28use std::time::Duration;
29
30use tokio::sync::{mpsc, oneshot};
31use tokio::task::JoinHandle;
32use tracing::{debug, instrument};
33
34use crate::config::ConsumerConfig;
35use crate::types::{
36    CommitOffset, ConsumerGroupMetadata, ConsumerRecords, SubscriptionPattern, TopicPartition,
37    TopicPartitionInfo, TopicPartitionOffset, TopicPartitionOffsetAndTimestamp,
38    TopicPartitionTimestamp,
39};
40use crate::{CancellationToken, ConsumerError, Result};
41
42use runtime::ConsumerRuntime;
43
44/// A Kafka group consumer with explicit polling and commits.
45pub struct KafkaConsumer {
46    consumer_runtime: ConsumerRuntimeHandle,
47    join: JoinHandle<()>,
48    default_poll_timeout: Duration,
49}
50
51pub use share::{
52    AcknowledgeType, AcknowledgementCommitCallback, KafkaShareConsumer, ShareAcknowledgementCommit,
53    ShareAcquireMode, ShareConsumerOptions, ShareRecord, ShareRecords,
54};
55
56impl KafkaConsumer {
57    #[instrument(
58        name = "consumer.connect",
59        level = "debug",
60        skip(config),
61        fields(
62            bootstrap_server_count = config.bootstrap_servers.len(),
63            client_id = %config.client_id,
64            group_id = %config.group_id
65        )
66    )]
67    /// Connects to Kafka and returns the client.
68    pub async fn connect(config: ConsumerConfig) -> Result<Self> {
69        let (tx, rx) = mpsc::channel(64);
70        let runtime = ConsumerRuntime::new(config);
71        let join = tokio::spawn(async move {
72            runtime.run(rx).await;
73        });
74
75        let consumer = Self {
76            consumer_runtime: ConsumerRuntimeHandle::new(tx),
77            join,
78            default_poll_timeout: Duration::from_millis(100),
79        };
80        if let Err(error) = consumer.warm_up().await {
81            consumer.join.abort();
82            return Err(error);
83        }
84        debug!("consumer connected");
85        Ok(consumer)
86    }
87
88    /// Sets default poll timeout and returns the updated value.
89    pub fn with_default_poll_timeout(mut self, timeout: Duration) -> Self {
90        self.default_poll_timeout = timeout;
91        self
92    }
93
94    #[instrument(
95        name = "consumer.subscribe",
96        level = "debug",
97        skip(self, topics),
98        fields(topic_count = topics.len())
99    )]
100    /// Subscribe.
101    pub async fn subscribe(&self, topics: Vec<String>) -> Result<()> {
102        let (reply_tx, reply_rx) = oneshot::channel();
103        self.consumer_runtime
104            .send(
105                ConsumerRuntimeEvent::Subscribe {
106                    topics,
107                    cancellation: None,
108                    reply: reply_tx,
109                },
110                ConsumerError::ThreadStoppedBefore {
111                    operation: "subscribe",
112                },
113            )
114            .await?;
115        reply_rx
116            .await
117            .map_err(|_| ConsumerError::ThreadStoppedDuring {
118                operation: "subscribe",
119            })?
120    }
121
122    #[instrument(
123        name = "consumer.subscribe_pattern",
124        level = "debug",
125        skip(self, pattern),
126        fields(pattern = %pattern.pattern())
127    )]
128    /// Subscribe Pattern.
129    pub async fn subscribe_pattern(&self, pattern: SubscriptionPattern) -> Result<()> {
130        let (reply_tx, reply_rx) = oneshot::channel();
131        self.consumer_runtime
132            .send(
133                ConsumerRuntimeEvent::SubscribePattern {
134                    pattern,
135                    cancellation: None,
136                    reply: reply_tx,
137                },
138                ConsumerError::ThreadStoppedBefore {
139                    operation: "subscribe_pattern",
140                },
141            )
142            .await?;
143        reply_rx
144            .await
145            .map_err(|_| ConsumerError::ThreadStoppedDuring {
146                operation: "subscribe_pattern",
147            })?
148    }
149
150    #[instrument(
151        name = "consumer.subscribe_regex",
152        level = "debug",
153        skip(self, pattern),
154        fields(pattern = %pattern)
155    )]
156    /// Subscribe Regex.
157    pub async fn subscribe_regex(&self, pattern: String) -> Result<()> {
158        let (reply_tx, reply_rx) = oneshot::channel();
159        self.consumer_runtime
160            .send(
161                ConsumerRuntimeEvent::SubscribeRegex {
162                    pattern,
163                    cancellation: None,
164                    reply: reply_tx,
165                },
166                ConsumerError::ThreadStoppedBefore {
167                    operation: "subscribe_regex",
168                },
169            )
170            .await?;
171        reply_rx
172            .await
173            .map_err(|_| ConsumerError::ThreadStoppedDuring {
174                operation: "subscribe_regex",
175            })?
176    }
177
178    #[instrument(name = "consumer.unsubscribe", level = "debug", skip(self))]
179    /// Unsubscribe.
180    pub async fn unsubscribe(&self) -> Result<()> {
181        let (reply_tx, reply_rx) = oneshot::channel();
182        self.consumer_runtime
183            .send(
184                ConsumerRuntimeEvent::Unsubscribe {
185                    cancellation: None,
186                    reply: reply_tx,
187                },
188                ConsumerError::ThreadStoppedBefore {
189                    operation: "unsubscribe",
190                },
191            )
192            .await?;
193        reply_rx
194            .await
195            .map_err(|_| ConsumerError::ThreadStoppedDuring {
196                operation: "unsubscribe",
197            })?
198    }
199
200    /// Polls Kafka for records.
201    pub async fn poll(&self) -> Result<ConsumerRecords> {
202        self.poll_for(self.default_poll_timeout).await
203    }
204
205    #[instrument(
206        name = "consumer.poll",
207        level = "debug",
208        skip(self),
209        fields(timeout_ms = timeout.as_millis())
210    )]
211    /// Poll For.
212    pub async fn poll_for(&self, timeout: Duration) -> Result<ConsumerRecords> {
213        let (reply_tx, reply_rx) = oneshot::channel();
214        self.consumer_runtime
215            .send(
216                ConsumerRuntimeEvent::Poll {
217                    timeout,
218                    cancellation: None,
219                    reply: reply_tx,
220                },
221                ConsumerError::ThreadStoppedBefore { operation: "poll" },
222            )
223            .await?;
224        reply_rx
225            .await
226            .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "poll" })?
227    }
228
229    #[instrument(
230        name = "consumer.assign",
231        level = "debug",
232        skip(self, partitions),
233        fields(partition_count = partitions.len())
234    )]
235    /// Assign.
236    pub async fn assign(&self, partitions: Vec<TopicPartition>) -> Result<()> {
237        let (reply_tx, reply_rx) = oneshot::channel();
238        self.consumer_runtime
239            .send(
240                ConsumerRuntimeEvent::Assign {
241                    partitions,
242                    cancellation: None,
243                    reply: reply_tx,
244                },
245                ConsumerError::ThreadStoppedBefore {
246                    operation: "assign",
247                },
248            )
249            .await?;
250        reply_rx
251            .await
252            .map_err(|_| ConsumerError::ThreadStoppedDuring {
253                operation: "assign",
254            })?
255    }
256
257    #[instrument(
258        name = "consumer.seek",
259        level = "debug",
260        skip(self, partition),
261        fields(topic = %partition.topic, partition_id = partition.partition, offset)
262    )]
263    /// Seek.
264    pub async fn seek(&self, partition: TopicPartition, offset: i64) -> Result<()> {
265        let (reply_tx, reply_rx) = oneshot::channel();
266        self.consumer_runtime
267            .send(
268                ConsumerRuntimeEvent::Seek {
269                    partition,
270                    offset,
271                    cancellation: None,
272                    reply: reply_tx,
273                },
274                ConsumerError::ThreadStoppedBefore { operation: "seek" },
275            )
276            .await?;
277        reply_rx
278            .await
279            .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "seek" })?
280    }
281
282    #[instrument(
283        name = "consumer.seek_to_beginning",
284        level = "debug",
285        skip(self, partitions),
286        fields(partition_count = partitions.len())
287    )]
288    /// Seek To Beginning.
289    pub async fn seek_to_beginning(&self, partitions: Vec<TopicPartition>) -> Result<()> {
290        let (reply_tx, reply_rx) = oneshot::channel();
291        self.consumer_runtime
292            .send(
293                ConsumerRuntimeEvent::SeekToBeginning {
294                    partitions,
295                    cancellation: None,
296                    reply: reply_tx,
297                },
298                ConsumerError::ThreadStoppedBefore {
299                    operation: "seek_to_beginning",
300                },
301            )
302            .await?;
303        reply_rx
304            .await
305            .map_err(|_| ConsumerError::ThreadStoppedDuring {
306                operation: "seek_to_beginning",
307            })?
308    }
309
310    #[instrument(
311        name = "consumer.seek_to_end",
312        level = "debug",
313        skip(self, partitions),
314        fields(partition_count = partitions.len())
315    )]
316    /// Seek To End.
317    pub async fn seek_to_end(&self, partitions: Vec<TopicPartition>) -> Result<()> {
318        let (reply_tx, reply_rx) = oneshot::channel();
319        self.consumer_runtime
320            .send(
321                ConsumerRuntimeEvent::SeekToEnd {
322                    partitions,
323                    cancellation: None,
324                    reply: reply_tx,
325                },
326                ConsumerError::ThreadStoppedBefore {
327                    operation: "seek_to_end",
328                },
329            )
330            .await?;
331        reply_rx
332            .await
333            .map_err(|_| ConsumerError::ThreadStoppedDuring {
334                operation: "seek_to_end",
335            })?
336    }
337
338    #[instrument(
339        name = "consumer.seek_to_timestamp",
340        level = "debug",
341        skip(self, partitions),
342        fields(partition_count = partitions.len())
343    )]
344    /// Seek To Timestamp.
345    pub async fn seek_to_timestamp(&self, partitions: Vec<TopicPartitionTimestamp>) -> Result<()> {
346        let (reply_tx, reply_rx) = oneshot::channel();
347        self.consumer_runtime
348            .send(
349                ConsumerRuntimeEvent::SeekToTimestamp {
350                    partitions,
351                    cancellation: None,
352                    reply: reply_tx,
353                },
354                ConsumerError::ThreadStoppedBefore {
355                    operation: "seek_to_timestamp",
356                },
357            )
358            .await?;
359        reply_rx
360            .await
361            .map_err(|_| ConsumerError::ThreadStoppedDuring {
362                operation: "seek_to_timestamp",
363            })?
364    }
365
366    #[instrument(
367        name = "consumer.position",
368        level = "debug",
369        skip(self, partition),
370        fields(topic = %partition.topic, partition_id = partition.partition)
371    )]
372    /// Position.
373    pub async fn position(&self, partition: TopicPartition) -> Result<i64> {
374        let (reply_tx, reply_rx) = oneshot::channel();
375        self.consumer_runtime
376            .send(
377                ConsumerRuntimeEvent::Position {
378                    partition,
379                    cancellation: None,
380                    reply: reply_tx,
381                },
382                ConsumerError::ThreadStoppedBefore {
383                    operation: "position",
384                },
385            )
386            .await?;
387        reply_rx
388            .await
389            .map_err(|_| ConsumerError::ThreadStoppedDuring {
390                operation: "position",
391            })?
392    }
393
394    #[instrument(
395        name = "consumer.pause",
396        level = "debug",
397        skip(self, partitions),
398        fields(partition_count = partitions.len())
399    )]
400    /// Pause.
401    pub async fn pause(&self, partitions: Vec<TopicPartition>) -> Result<()> {
402        let (reply_tx, reply_rx) = oneshot::channel();
403        self.consumer_runtime
404            .send(
405                ConsumerRuntimeEvent::Pause {
406                    partitions,
407                    cancellation: None,
408                    reply: reply_tx,
409                },
410                ConsumerError::ThreadStoppedBefore { operation: "pause" },
411            )
412            .await?;
413        reply_rx
414            .await
415            .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "pause" })?
416    }
417
418    #[instrument(
419        name = "consumer.resume",
420        level = "debug",
421        skip(self, partitions),
422        fields(partition_count = partitions.len())
423    )]
424    /// Resume.
425    pub async fn resume(&self, partitions: Vec<TopicPartition>) -> Result<()> {
426        let (reply_tx, reply_rx) = oneshot::channel();
427        self.consumer_runtime
428            .send(
429                ConsumerRuntimeEvent::Resume {
430                    partitions,
431                    cancellation: None,
432                    reply: reply_tx,
433                },
434                ConsumerError::ThreadStoppedBefore {
435                    operation: "resume",
436                },
437            )
438            .await?;
439        reply_rx
440            .await
441            .map_err(|_| ConsumerError::ThreadStoppedDuring {
442                operation: "resume",
443            })?
444    }
445
446    #[instrument(name = "consumer.group_metadata", level = "debug", skip(self))]
447    /// Group Metadata.
448    pub async fn group_metadata(&self) -> Result<ConsumerGroupMetadata> {
449        let (reply_tx, reply_rx) = oneshot::channel();
450        self.consumer_runtime
451            .send(
452                ConsumerRuntimeEvent::GroupMetadata {
453                    cancellation: None,
454                    reply: reply_tx,
455                },
456                ConsumerError::ThreadStoppedBefore {
457                    operation: "group_metadata",
458                },
459            )
460            .await?;
461        reply_rx
462            .await
463            .map_err(|_| ConsumerError::ThreadStoppedDuring {
464                operation: "group_metadata",
465            })?
466    }
467
468    #[instrument(name = "consumer.assignment", level = "debug", skip(self))]
469    /// Assignment.
470    pub async fn assignment(&self) -> Result<BTreeSet<TopicPartition>> {
471        let (reply_tx, reply_rx) = oneshot::channel();
472        self.consumer_runtime
473            .send(
474                ConsumerRuntimeEvent::Assignment {
475                    cancellation: None,
476                    reply: reply_tx,
477                },
478                ConsumerError::ThreadStoppedBefore {
479                    operation: "assignment",
480                },
481            )
482            .await?;
483        reply_rx
484            .await
485            .map_err(|_| ConsumerError::ThreadStoppedDuring {
486                operation: "assignment",
487            })?
488    }
489
490    #[instrument(
491        name = "consumer.committed_offsets",
492        level = "debug",
493        skip(self, partitions),
494        fields(partition_count = partitions.len())
495    )]
496    /// Committed.
497    pub async fn committed(
498        &self,
499        partitions: Vec<TopicPartition>,
500    ) -> Result<Vec<TopicPartitionOffset>> {
501        let (reply_tx, reply_rx) = oneshot::channel();
502        self.consumer_runtime
503            .send(
504                ConsumerRuntimeEvent::Committed {
505                    partitions,
506                    cancellation: None,
507                    reply: reply_tx,
508                },
509                ConsumerError::ThreadStoppedBefore {
510                    operation: "committed",
511                },
512            )
513            .await?;
514        reply_rx
515            .await
516            .map_err(|_| ConsumerError::ThreadStoppedDuring {
517                operation: "committed",
518            })?
519    }
520
521    #[instrument(
522        name = "consumer.beginning_offsets",
523        level = "debug",
524        skip(self, partitions),
525        fields(partition_count = partitions.len())
526    )]
527    /// Beginning Offsets.
528    pub async fn beginning_offsets(
529        &self,
530        partitions: Vec<TopicPartition>,
531    ) -> Result<Vec<TopicPartitionOffset>> {
532        let (reply_tx, reply_rx) = oneshot::channel();
533        self.consumer_runtime
534            .send(
535                ConsumerRuntimeEvent::BeginningOffsets {
536                    partitions,
537                    cancellation: None,
538                    reply: reply_tx,
539                },
540                ConsumerError::ThreadStoppedBefore {
541                    operation: "beginning_offsets",
542                },
543            )
544            .await?;
545        reply_rx
546            .await
547            .map_err(|_| ConsumerError::ThreadStoppedDuring {
548                operation: "beginning_offsets",
549            })?
550    }
551
552    #[instrument(
553        name = "consumer.end_offsets",
554        level = "debug",
555        skip(self, partitions),
556        fields(partition_count = partitions.len())
557    )]
558    /// End Offsets.
559    pub async fn end_offsets(
560        &self,
561        partitions: Vec<TopicPartition>,
562    ) -> Result<Vec<TopicPartitionOffset>> {
563        let (reply_tx, reply_rx) = oneshot::channel();
564        self.consumer_runtime
565            .send(
566                ConsumerRuntimeEvent::EndOffsets {
567                    partitions,
568                    cancellation: None,
569                    reply: reply_tx,
570                },
571                ConsumerError::ThreadStoppedBefore {
572                    operation: "end_offsets",
573                },
574            )
575            .await?;
576        reply_rx
577            .await
578            .map_err(|_| ConsumerError::ThreadStoppedDuring {
579                operation: "end_offsets",
580            })?
581    }
582
583    #[instrument(
584        name = "consumer.offsets_for_times",
585        level = "debug",
586        skip(self, partitions),
587        fields(partition_count = partitions.len())
588    )]
589    /// Offsets For Times.
590    pub async fn offsets_for_times(
591        &self,
592        partitions: Vec<TopicPartitionTimestamp>,
593    ) -> Result<Vec<TopicPartitionOffsetAndTimestamp>> {
594        let (reply_tx, reply_rx) = oneshot::channel();
595        self.consumer_runtime
596            .send(
597                ConsumerRuntimeEvent::OffsetsForTimes {
598                    partitions,
599                    cancellation: None,
600                    reply: reply_tx,
601                },
602                ConsumerError::ThreadStoppedBefore {
603                    operation: "offsets_for_times",
604                },
605            )
606            .await?;
607        reply_rx
608            .await
609            .map_err(|_| ConsumerError::ThreadStoppedDuring {
610                operation: "offsets_for_times",
611            })?
612    }
613
614    #[instrument(
615        name = "consumer.partitions_for",
616        level = "debug",
617        skip(self),
618        fields(topic = %topic)
619    )]
620    /// Partitions For.
621    pub async fn partitions_for(&self, topic: String) -> Result<Vec<TopicPartitionInfo>> {
622        let (reply_tx, reply_rx) = oneshot::channel();
623        self.consumer_runtime
624            .send(
625                ConsumerRuntimeEvent::PartitionsFor {
626                    topic,
627                    cancellation: None,
628                    reply: reply_tx,
629                },
630                ConsumerError::ThreadStoppedBefore {
631                    operation: "partitions_for",
632                },
633            )
634            .await?;
635        reply_rx
636            .await
637            .map_err(|_| ConsumerError::ThreadStoppedDuring {
638                operation: "partitions_for",
639            })?
640    }
641
642    #[instrument(name = "consumer.list_topics", level = "debug", skip(self))]
643    /// List Topics.
644    pub async fn list_topics(&self) -> Result<Vec<String>> {
645        let (reply_tx, reply_rx) = oneshot::channel();
646        self.consumer_runtime
647            .send(
648                ConsumerRuntimeEvent::ListTopics {
649                    cancellation: None,
650                    reply: reply_tx,
651                },
652                ConsumerError::ThreadStoppedBefore {
653                    operation: "list_topics",
654                },
655            )
656            .await?;
657        reply_rx
658            .await
659            .map_err(|_| ConsumerError::ThreadStoppedDuring {
660                operation: "list_topics",
661            })?
662    }
663
664    /// Commit.
665    pub async fn commit(&self, records: &ConsumerRecords) -> Result<()> {
666        self.commit_offsets(records.commit_offsets()).await
667    }
668
669    #[instrument(
670        name = "consumer.commit_offsets",
671        level = "debug",
672        skip(self, offsets),
673        fields(offset_count = offsets.len())
674    )]
675    /// Commit Offsets.
676    pub async fn commit_offsets(&self, offsets: Vec<CommitOffset>) -> Result<()> {
677        if offsets.is_empty() {
678            return Ok(());
679        }
680
681        let (reply_tx, reply_rx) = oneshot::channel();
682        self.consumer_runtime
683            .send(
684                ConsumerRuntimeEvent::Commit {
685                    offsets,
686                    cancellation: None,
687                    reply: reply_tx,
688                },
689                ConsumerError::ThreadStoppedBefore {
690                    operation: "commit",
691                },
692            )
693            .await?;
694        reply_rx
695            .await
696            .map_err(|_| ConsumerError::ThreadStoppedDuring {
697                operation: "commit",
698            })?
699    }
700
701    #[instrument(name = "consumer.wakeup", level = "debug", skip(self))]
702    /// Wakeup.
703    pub async fn wakeup(&self) -> Result<()> {
704        self.consumer_runtime
705            .send(
706                ConsumerRuntimeEvent::Wakeup,
707                ConsumerError::ThreadStoppedBefore {
708                    operation: "wakeup",
709                },
710            )
711            .await
712    }
713
714    #[instrument(name = "consumer.shutdown", level = "debug", skip(self))]
715    /// Shuts the client down and waits for in-flight work to finish.
716    pub async fn shutdown(self) -> Result<()> {
717        let (reply_tx, reply_rx) = oneshot::channel();
718        self.consumer_runtime
719            .send(
720                ConsumerRuntimeEvent::Shutdown { reply: reply_tx },
721                ConsumerError::ThreadStoppedBefore {
722                    operation: "shutdown",
723                },
724            )
725            .await?;
726
727        let result = reply_rx
728            .await
729            .map_err(|_| ConsumerError::ThreadStoppedDuring {
730                operation: "shutdown",
731            })?;
732        self.join.await.map_err(ConsumerError::Join)?;
733        result
734    }
735
736    #[instrument(name = "consumer.warm_up", level = "trace", skip(self))]
737    async fn warm_up(&self) -> Result<()> {
738        let (reply_tx, reply_rx) = oneshot::channel();
739        self.consumer_runtime
740            .send(
741                ConsumerRuntimeEvent::WarmUp {
742                    cancellation: None,
743                    reply: reply_tx,
744                },
745                ConsumerError::ThreadStoppedBefore {
746                    operation: "startup",
747                },
748            )
749            .await?;
750        reply_rx
751            .await
752            .map_err(|_| ConsumerError::ThreadStoppedDuring {
753                operation: "startup",
754            })?
755    }
756}
757
758struct ConsumerRuntimeHandle {
759    tx: mpsc::Sender<ConsumerRuntimeEvent>,
760}
761
762impl ConsumerRuntimeHandle {
763    fn new(tx: mpsc::Sender<ConsumerRuntimeEvent>) -> Self {
764        Self { tx }
765    }
766
767    async fn send(&self, event: ConsumerRuntimeEvent, stopped_error: ConsumerError) -> Result<()> {
768        self.tx.send(event).await.map_err(|_| stopped_error.into())
769    }
770}
771
772enum ConsumerRuntimeEvent {
773    WarmUp {
774        cancellation: Option<CancellationToken>,
775        reply: oneshot::Sender<Result<()>>,
776    },
777    Subscribe {
778        topics: Vec<String>,
779        cancellation: Option<CancellationToken>,
780        reply: oneshot::Sender<Result<()>>,
781    },
782    SubscribePattern {
783        pattern: SubscriptionPattern,
784        cancellation: Option<CancellationToken>,
785        reply: oneshot::Sender<Result<()>>,
786    },
787    SubscribeRegex {
788        pattern: String,
789        cancellation: Option<CancellationToken>,
790        reply: oneshot::Sender<Result<()>>,
791    },
792    Unsubscribe {
793        cancellation: Option<CancellationToken>,
794        reply: oneshot::Sender<Result<()>>,
795    },
796    Assign {
797        partitions: Vec<TopicPartition>,
798        cancellation: Option<CancellationToken>,
799        reply: oneshot::Sender<Result<()>>,
800    },
801    Poll {
802        timeout: Duration,
803        cancellation: Option<CancellationToken>,
804        reply: oneshot::Sender<Result<ConsumerRecords>>,
805    },
806    Seek {
807        partition: TopicPartition,
808        offset: i64,
809        cancellation: Option<CancellationToken>,
810        reply: oneshot::Sender<Result<()>>,
811    },
812    SeekToBeginning {
813        partitions: Vec<TopicPartition>,
814        cancellation: Option<CancellationToken>,
815        reply: oneshot::Sender<Result<()>>,
816    },
817    SeekToEnd {
818        partitions: Vec<TopicPartition>,
819        cancellation: Option<CancellationToken>,
820        reply: oneshot::Sender<Result<()>>,
821    },
822    SeekToTimestamp {
823        partitions: Vec<TopicPartitionTimestamp>,
824        cancellation: Option<CancellationToken>,
825        reply: oneshot::Sender<Result<()>>,
826    },
827    Position {
828        partition: TopicPartition,
829        cancellation: Option<CancellationToken>,
830        reply: oneshot::Sender<Result<i64>>,
831    },
832    Pause {
833        partitions: Vec<TopicPartition>,
834        cancellation: Option<CancellationToken>,
835        reply: oneshot::Sender<Result<()>>,
836    },
837    Resume {
838        partitions: Vec<TopicPartition>,
839        cancellation: Option<CancellationToken>,
840        reply: oneshot::Sender<Result<()>>,
841    },
842    GroupMetadata {
843        cancellation: Option<CancellationToken>,
844        reply: oneshot::Sender<Result<ConsumerGroupMetadata>>,
845    },
846    Assignment {
847        cancellation: Option<CancellationToken>,
848        reply: oneshot::Sender<Result<BTreeSet<TopicPartition>>>,
849    },
850    Committed {
851        partitions: Vec<TopicPartition>,
852        cancellation: Option<CancellationToken>,
853        reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
854    },
855    BeginningOffsets {
856        partitions: Vec<TopicPartition>,
857        cancellation: Option<CancellationToken>,
858        reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
859    },
860    EndOffsets {
861        partitions: Vec<TopicPartition>,
862        cancellation: Option<CancellationToken>,
863        reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
864    },
865    OffsetsForTimes {
866        partitions: Vec<TopicPartitionTimestamp>,
867        cancellation: Option<CancellationToken>,
868        reply: oneshot::Sender<Result<Vec<TopicPartitionOffsetAndTimestamp>>>,
869    },
870    PartitionsFor {
871        topic: String,
872        cancellation: Option<CancellationToken>,
873        reply: oneshot::Sender<Result<Vec<TopicPartitionInfo>>>,
874    },
875    ListTopics {
876        cancellation: Option<CancellationToken>,
877        reply: oneshot::Sender<Result<Vec<String>>>,
878    },
879    Commit {
880        offsets: Vec<CommitOffset>,
881        cancellation: Option<CancellationToken>,
882        reply: oneshot::Sender<Result<()>>,
883    },
884    Wakeup,
885    Shutdown {
886        reply: oneshot::Sender<Result<()>>,
887    },
888}
889
890impl ConsumerRuntimeEvent {
891    fn is_cancelled(&self) -> bool {
892        match self {
893            Self::WarmUp { cancellation, .. }
894            | Self::Subscribe { cancellation, .. }
895            | Self::SubscribePattern { cancellation, .. }
896            | Self::SubscribeRegex { cancellation, .. }
897            | Self::Unsubscribe { cancellation, .. }
898            | Self::Assign { cancellation, .. }
899            | Self::Poll { cancellation, .. }
900            | Self::Seek { cancellation, .. }
901            | Self::SeekToBeginning { cancellation, .. }
902            | Self::SeekToEnd { cancellation, .. }
903            | Self::SeekToTimestamp { cancellation, .. }
904            | Self::Position { cancellation, .. }
905            | Self::Pause { cancellation, .. }
906            | Self::Resume { cancellation, .. }
907            | Self::GroupMetadata { cancellation, .. }
908            | Self::Assignment { cancellation, .. }
909            | Self::Committed { cancellation, .. }
910            | Self::BeginningOffsets { cancellation, .. }
911            | Self::EndOffsets { cancellation, .. }
912            | Self::OffsetsForTimes { cancellation, .. }
913            | Self::PartitionsFor { cancellation, .. }
914            | Self::ListTopics { cancellation, .. }
915            | Self::Commit { cancellation, .. } => cancellation
916                .as_ref()
917                .is_some_and(CancellationToken::is_cancelled),
918            Self::Wakeup | Self::Shutdown { .. } => false,
919        }
920    }
921
922    fn send_cancelled(self) {
923        match self {
924            Self::WarmUp { reply, .. }
925            | Self::Subscribe { reply, .. }
926            | Self::SubscribePattern { reply, .. }
927            | Self::SubscribeRegex { reply, .. }
928            | Self::Unsubscribe { reply, .. }
929            | Self::Assign { reply, .. }
930            | Self::Seek { reply, .. }
931            | Self::SeekToBeginning { reply, .. }
932            | Self::SeekToEnd { reply, .. }
933            | Self::SeekToTimestamp { reply, .. }
934            | Self::Pause { reply, .. }
935            | Self::Resume { reply, .. }
936            | Self::Commit { reply, .. } => {
937                let _ = reply.send(Err(crate::Error::Cancelled));
938            }
939            Self::Poll { reply, .. } => {
940                let _ = reply.send(Err(crate::Error::Cancelled));
941            }
942            Self::Position { reply, .. } => {
943                let _ = reply.send(Err(crate::Error::Cancelled));
944            }
945            Self::GroupMetadata { reply, .. } => {
946                let _ = reply.send(Err(crate::Error::Cancelled));
947            }
948            Self::Assignment { reply, .. } => {
949                let _ = reply.send(Err(crate::Error::Cancelled));
950            }
951            Self::Committed { reply, .. }
952            | Self::BeginningOffsets { reply, .. }
953            | Self::EndOffsets { reply, .. } => {
954                let _ = reply.send(Err(crate::Error::Cancelled));
955            }
956            Self::OffsetsForTimes { reply, .. } => {
957                let _ = reply.send(Err(crate::Error::Cancelled));
958            }
959            Self::PartitionsFor { reply, .. } => {
960                let _ = reply.send(Err(crate::Error::Cancelled));
961            }
962            Self::ListTopics { reply, .. } => {
963                let _ = reply.send(Err(crate::Error::Cancelled));
964            }
965            Self::Wakeup | Self::Shutdown { .. } => {}
966        }
967    }
968
969    fn send_failed(self, message: &str) {
970        let error = || {
971            crate::Error::Consumer(crate::ConsumerError::Fatal {
972                message: message.to_owned(),
973            })
974        };
975        match self {
976            Self::WarmUp { reply, .. }
977            | Self::Subscribe { reply, .. }
978            | Self::SubscribePattern { reply, .. }
979            | Self::SubscribeRegex { reply, .. }
980            | Self::Unsubscribe { reply, .. }
981            | Self::Assign { reply, .. }
982            | Self::Seek { reply, .. }
983            | Self::SeekToBeginning { reply, .. }
984            | Self::SeekToEnd { reply, .. }
985            | Self::SeekToTimestamp { reply, .. }
986            | Self::Pause { reply, .. }
987            | Self::Resume { reply, .. }
988            | Self::Commit { reply, .. } => {
989                let _ = reply.send(Err(error()));
990            }
991            Self::Poll { reply, .. } => {
992                let _ = reply.send(Err(error()));
993            }
994            Self::Position { reply, .. } => {
995                let _ = reply.send(Err(error()));
996            }
997            Self::GroupMetadata { reply, .. } => {
998                let _ = reply.send(Err(error()));
999            }
1000            Self::Assignment { reply, .. } => {
1001                let _ = reply.send(Err(error()));
1002            }
1003            Self::Committed { reply, .. }
1004            | Self::BeginningOffsets { reply, .. }
1005            | Self::EndOffsets { reply, .. } => {
1006                let _ = reply.send(Err(error()));
1007            }
1008            Self::OffsetsForTimes { reply, .. } => {
1009                let _ = reply.send(Err(error()));
1010            }
1011            Self::PartitionsFor { reply, .. } => {
1012                let _ = reply.send(Err(error()));
1013            }
1014            Self::ListTopics { reply, .. } => {
1015                let _ = reply.send(Err(error()));
1016            }
1017            Self::Wakeup | Self::Shutdown { .. } => {}
1018        }
1019    }
1020}