Skip to main content

kafkit_client/consumer/
mod.rs

1//! Group consumer API.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::KafkaClient;
6//!
7//! let consumer = KafkaClient::new("localhost:9092")
8//!     .consumer("orders-reader")
9//!     .with_topic("orders")
10//!     .connect()
11//!     .await?;
12//!
13//! let records = consumer.poll().await?;
14//! consumer.commit(&records).await?;
15//! consumer.shutdown().await?;
16//! # Ok(())
17//! # }
18//! ```
19//!
20mod protocol;
21mod runtime;
22mod scheduler;
23mod share;
24mod state;
25
26use std::collections::BTreeSet;
27use std::time::Duration;
28
29use tokio::sync::{mpsc, oneshot};
30use tokio::task::JoinHandle;
31use tracing::{debug, instrument};
32
33use crate::config::ConsumerConfig;
34use crate::types::{
35    CommitOffset, ConsumerGroupMetadata, ConsumerRecords, SubscriptionPattern, TopicPartition,
36    TopicPartitionInfo, TopicPartitionOffset, TopicPartitionOffsetAndTimestamp,
37    TopicPartitionTimestamp,
38};
39use crate::{ConsumerError, Result};
40
41use runtime::ConsumerRuntime;
42
43/// A Kafka group consumer with explicit polling and commits.
44pub struct KafkaConsumer {
45    application_event_handler: ConsumerApplicationEventHandler,
46    join: JoinHandle<()>,
47    default_poll_timeout: Duration,
48}
49
50pub use share::{
51    AcknowledgeType, AcknowledgementCommitCallback, KafkaShareConsumer, ShareAcknowledgementCommit,
52    ShareAcquireMode, ShareConsumerOptions, ShareRecord, ShareRecords,
53};
54
55impl KafkaConsumer {
56    #[instrument(
57        name = "consumer.connect",
58        level = "debug",
59        skip(config),
60        fields(
61            bootstrap_server_count = config.bootstrap_servers.len(),
62            client_id = %config.client_id,
63            group_id = %config.group_id
64        )
65    )]
66    /// Connects to Kafka and returns the client.
67    pub async fn connect(config: ConsumerConfig) -> Result<Self> {
68        let (tx, rx) = mpsc::channel(64);
69        let runtime = ConsumerRuntime::new(config);
70        let join = tokio::spawn(async move {
71            runtime.run(rx).await;
72        });
73
74        let consumer = Self {
75            application_event_handler: ConsumerApplicationEventHandler::new(tx),
76            join,
77            default_poll_timeout: Duration::from_millis(100),
78        };
79        if let Err(error) = consumer.warm_up().await {
80            consumer.join.abort();
81            return Err(error);
82        }
83        debug!("consumer connected");
84        Ok(consumer)
85    }
86
87    /// Sets default poll timeout and returns the updated value.
88    pub fn with_default_poll_timeout(mut self, timeout: Duration) -> Self {
89        self.default_poll_timeout = timeout;
90        self
91    }
92
93    #[instrument(
94        name = "consumer.subscribe",
95        level = "debug",
96        skip(self, topics),
97        fields(topic_count = topics.len())
98    )]
99    /// Subscribe.
100    pub async fn subscribe(&self, topics: Vec<String>) -> Result<()> {
101        let (reply_tx, reply_rx) = oneshot::channel();
102        self.application_event_handler
103            .send(
104                ConsumerRuntimeEvent::Subscribe {
105                    topics,
106                    reply: reply_tx,
107                },
108                ConsumerError::ThreadStoppedBefore {
109                    operation: "subscribe",
110                },
111            )
112            .await?;
113        reply_rx
114            .await
115            .map_err(|_| ConsumerError::ThreadStoppedDuring {
116                operation: "subscribe",
117            })?
118    }
119
120    #[instrument(
121        name = "consumer.subscribe_pattern",
122        level = "debug",
123        skip(self, pattern),
124        fields(pattern = %pattern.pattern())
125    )]
126    /// Subscribe Pattern.
127    pub async fn subscribe_pattern(&self, pattern: SubscriptionPattern) -> Result<()> {
128        let (reply_tx, reply_rx) = oneshot::channel();
129        self.application_event_handler
130            .send(
131                ConsumerRuntimeEvent::SubscribePattern {
132                    pattern,
133                    reply: reply_tx,
134                },
135                ConsumerError::ThreadStoppedBefore {
136                    operation: "subscribe_pattern",
137                },
138            )
139            .await?;
140        reply_rx
141            .await
142            .map_err(|_| ConsumerError::ThreadStoppedDuring {
143                operation: "subscribe_pattern",
144            })?
145    }
146
147    #[instrument(
148        name = "consumer.subscribe_regex",
149        level = "debug",
150        skip(self, pattern),
151        fields(pattern = %pattern)
152    )]
153    /// Subscribe Regex.
154    pub async fn subscribe_regex(&self, pattern: String) -> Result<()> {
155        let (reply_tx, reply_rx) = oneshot::channel();
156        self.application_event_handler
157            .send(
158                ConsumerRuntimeEvent::SubscribeRegex {
159                    pattern,
160                    reply: reply_tx,
161                },
162                ConsumerError::ThreadStoppedBefore {
163                    operation: "subscribe_regex",
164                },
165            )
166            .await?;
167        reply_rx
168            .await
169            .map_err(|_| ConsumerError::ThreadStoppedDuring {
170                operation: "subscribe_regex",
171            })?
172    }
173
174    #[instrument(name = "consumer.unsubscribe", level = "debug", skip(self))]
175    /// Unsubscribe.
176    pub async fn unsubscribe(&self) -> Result<()> {
177        let (reply_tx, reply_rx) = oneshot::channel();
178        self.application_event_handler
179            .send(
180                ConsumerRuntimeEvent::Unsubscribe { reply: reply_tx },
181                ConsumerError::ThreadStoppedBefore {
182                    operation: "unsubscribe",
183                },
184            )
185            .await?;
186        reply_rx
187            .await
188            .map_err(|_| ConsumerError::ThreadStoppedDuring {
189                operation: "unsubscribe",
190            })?
191    }
192
193    /// Polls Kafka for records.
194    pub async fn poll(&self) -> Result<ConsumerRecords> {
195        self.poll_for(self.default_poll_timeout).await
196    }
197
198    #[instrument(
199        name = "consumer.poll",
200        level = "debug",
201        skip(self),
202        fields(timeout_ms = timeout.as_millis())
203    )]
204    /// Poll For.
205    pub async fn poll_for(&self, timeout: Duration) -> Result<ConsumerRecords> {
206        let (reply_tx, reply_rx) = oneshot::channel();
207        self.application_event_handler
208            .send(
209                ConsumerRuntimeEvent::Poll {
210                    timeout,
211                    reply: reply_tx,
212                },
213                ConsumerError::ThreadStoppedBefore { operation: "poll" },
214            )
215            .await?;
216        reply_rx
217            .await
218            .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "poll" })?
219    }
220
221    #[instrument(
222        name = "consumer.assign",
223        level = "debug",
224        skip(self, partitions),
225        fields(partition_count = partitions.len())
226    )]
227    /// Assign.
228    pub async fn assign(&self, partitions: Vec<TopicPartition>) -> Result<()> {
229        let (reply_tx, reply_rx) = oneshot::channel();
230        self.application_event_handler
231            .send(
232                ConsumerRuntimeEvent::Assign {
233                    partitions,
234                    reply: reply_tx,
235                },
236                ConsumerError::ThreadStoppedBefore {
237                    operation: "assign",
238                },
239            )
240            .await?;
241        reply_rx
242            .await
243            .map_err(|_| ConsumerError::ThreadStoppedDuring {
244                operation: "assign",
245            })?
246    }
247
248    #[instrument(
249        name = "consumer.seek",
250        level = "debug",
251        skip(self, partition),
252        fields(topic = %partition.topic, partition_id = partition.partition, offset)
253    )]
254    /// Seek.
255    pub async fn seek(&self, partition: TopicPartition, offset: i64) -> Result<()> {
256        let (reply_tx, reply_rx) = oneshot::channel();
257        self.application_event_handler
258            .send(
259                ConsumerRuntimeEvent::Seek {
260                    partition,
261                    offset,
262                    reply: reply_tx,
263                },
264                ConsumerError::ThreadStoppedBefore { operation: "seek" },
265            )
266            .await?;
267        reply_rx
268            .await
269            .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "seek" })?
270    }
271
272    #[instrument(
273        name = "consumer.seek_to_beginning",
274        level = "debug",
275        skip(self, partitions),
276        fields(partition_count = partitions.len())
277    )]
278    /// Seek To Beginning.
279    pub async fn seek_to_beginning(&self, partitions: Vec<TopicPartition>) -> Result<()> {
280        let (reply_tx, reply_rx) = oneshot::channel();
281        self.application_event_handler
282            .send(
283                ConsumerRuntimeEvent::SeekToBeginning {
284                    partitions,
285                    reply: reply_tx,
286                },
287                ConsumerError::ThreadStoppedBefore {
288                    operation: "seek_to_beginning",
289                },
290            )
291            .await?;
292        reply_rx
293            .await
294            .map_err(|_| ConsumerError::ThreadStoppedDuring {
295                operation: "seek_to_beginning",
296            })?
297    }
298
299    #[instrument(
300        name = "consumer.seek_to_end",
301        level = "debug",
302        skip(self, partitions),
303        fields(partition_count = partitions.len())
304    )]
305    /// Seek To End.
306    pub async fn seek_to_end(&self, partitions: Vec<TopicPartition>) -> Result<()> {
307        let (reply_tx, reply_rx) = oneshot::channel();
308        self.application_event_handler
309            .send(
310                ConsumerRuntimeEvent::SeekToEnd {
311                    partitions,
312                    reply: reply_tx,
313                },
314                ConsumerError::ThreadStoppedBefore {
315                    operation: "seek_to_end",
316                },
317            )
318            .await?;
319        reply_rx
320            .await
321            .map_err(|_| ConsumerError::ThreadStoppedDuring {
322                operation: "seek_to_end",
323            })?
324    }
325
326    #[instrument(
327        name = "consumer.seek_to_timestamp",
328        level = "debug",
329        skip(self, partitions),
330        fields(partition_count = partitions.len())
331    )]
332    /// Seek To Timestamp.
333    pub async fn seek_to_timestamp(&self, partitions: Vec<TopicPartitionTimestamp>) -> Result<()> {
334        let (reply_tx, reply_rx) = oneshot::channel();
335        self.application_event_handler
336            .send(
337                ConsumerRuntimeEvent::SeekToTimestamp {
338                    partitions,
339                    reply: reply_tx,
340                },
341                ConsumerError::ThreadStoppedBefore {
342                    operation: "seek_to_timestamp",
343                },
344            )
345            .await?;
346        reply_rx
347            .await
348            .map_err(|_| ConsumerError::ThreadStoppedDuring {
349                operation: "seek_to_timestamp",
350            })?
351    }
352
353    #[instrument(
354        name = "consumer.position",
355        level = "debug",
356        skip(self, partition),
357        fields(topic = %partition.topic, partition_id = partition.partition)
358    )]
359    /// Position.
360    pub async fn position(&self, partition: TopicPartition) -> Result<i64> {
361        let (reply_tx, reply_rx) = oneshot::channel();
362        self.application_event_handler
363            .send(
364                ConsumerRuntimeEvent::Position {
365                    partition,
366                    reply: reply_tx,
367                },
368                ConsumerError::ThreadStoppedBefore {
369                    operation: "position",
370                },
371            )
372            .await?;
373        reply_rx
374            .await
375            .map_err(|_| ConsumerError::ThreadStoppedDuring {
376                operation: "position",
377            })?
378    }
379
380    #[instrument(
381        name = "consumer.pause",
382        level = "debug",
383        skip(self, partitions),
384        fields(partition_count = partitions.len())
385    )]
386    /// Pause.
387    pub async fn pause(&self, partitions: Vec<TopicPartition>) -> Result<()> {
388        let (reply_tx, reply_rx) = oneshot::channel();
389        self.application_event_handler
390            .send(
391                ConsumerRuntimeEvent::Pause {
392                    partitions,
393                    reply: reply_tx,
394                },
395                ConsumerError::ThreadStoppedBefore { operation: "pause" },
396            )
397            .await?;
398        reply_rx
399            .await
400            .map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "pause" })?
401    }
402
403    #[instrument(
404        name = "consumer.resume",
405        level = "debug",
406        skip(self, partitions),
407        fields(partition_count = partitions.len())
408    )]
409    /// Resume.
410    pub async fn resume(&self, partitions: Vec<TopicPartition>) -> Result<()> {
411        let (reply_tx, reply_rx) = oneshot::channel();
412        self.application_event_handler
413            .send(
414                ConsumerRuntimeEvent::Resume {
415                    partitions,
416                    reply: reply_tx,
417                },
418                ConsumerError::ThreadStoppedBefore {
419                    operation: "resume",
420                },
421            )
422            .await?;
423        reply_rx
424            .await
425            .map_err(|_| ConsumerError::ThreadStoppedDuring {
426                operation: "resume",
427            })?
428    }
429
430    #[instrument(name = "consumer.group_metadata", level = "debug", skip(self))]
431    /// Group Metadata.
432    pub async fn group_metadata(&self) -> Result<ConsumerGroupMetadata> {
433        let (reply_tx, reply_rx) = oneshot::channel();
434        self.application_event_handler
435            .send(
436                ConsumerRuntimeEvent::GroupMetadata { reply: reply_tx },
437                ConsumerError::ThreadStoppedBefore {
438                    operation: "group_metadata",
439                },
440            )
441            .await?;
442        reply_rx
443            .await
444            .map_err(|_| ConsumerError::ThreadStoppedDuring {
445                operation: "group_metadata",
446            })?
447    }
448
449    #[instrument(name = "consumer.assignment", level = "debug", skip(self))]
450    /// Assignment.
451    pub async fn assignment(&self) -> Result<BTreeSet<TopicPartition>> {
452        let (reply_tx, reply_rx) = oneshot::channel();
453        self.application_event_handler
454            .send(
455                ConsumerRuntimeEvent::Assignment { reply: reply_tx },
456                ConsumerError::ThreadStoppedBefore {
457                    operation: "assignment",
458                },
459            )
460            .await?;
461        reply_rx
462            .await
463            .map_err(|_| ConsumerError::ThreadStoppedDuring {
464                operation: "assignment",
465            })?
466    }
467
468    #[instrument(
469        name = "consumer.committed_offsets",
470        level = "debug",
471        skip(self, partitions),
472        fields(partition_count = partitions.len())
473    )]
474    /// Committed.
475    pub async fn committed(
476        &self,
477        partitions: Vec<TopicPartition>,
478    ) -> Result<Vec<TopicPartitionOffset>> {
479        let (reply_tx, reply_rx) = oneshot::channel();
480        self.application_event_handler
481            .send(
482                ConsumerRuntimeEvent::Committed {
483                    partitions,
484                    reply: reply_tx,
485                },
486                ConsumerError::ThreadStoppedBefore {
487                    operation: "committed",
488                },
489            )
490            .await?;
491        reply_rx
492            .await
493            .map_err(|_| ConsumerError::ThreadStoppedDuring {
494                operation: "committed",
495            })?
496    }
497
498    #[instrument(
499        name = "consumer.beginning_offsets",
500        level = "debug",
501        skip(self, partitions),
502        fields(partition_count = partitions.len())
503    )]
504    /// Beginning Offsets.
505    pub async fn beginning_offsets(
506        &self,
507        partitions: Vec<TopicPartition>,
508    ) -> Result<Vec<TopicPartitionOffset>> {
509        let (reply_tx, reply_rx) = oneshot::channel();
510        self.application_event_handler
511            .send(
512                ConsumerRuntimeEvent::BeginningOffsets {
513                    partitions,
514                    reply: reply_tx,
515                },
516                ConsumerError::ThreadStoppedBefore {
517                    operation: "beginning_offsets",
518                },
519            )
520            .await?;
521        reply_rx
522            .await
523            .map_err(|_| ConsumerError::ThreadStoppedDuring {
524                operation: "beginning_offsets",
525            })?
526    }
527
528    #[instrument(
529        name = "consumer.end_offsets",
530        level = "debug",
531        skip(self, partitions),
532        fields(partition_count = partitions.len())
533    )]
534    /// End Offsets.
535    pub async fn end_offsets(
536        &self,
537        partitions: Vec<TopicPartition>,
538    ) -> Result<Vec<TopicPartitionOffset>> {
539        let (reply_tx, reply_rx) = oneshot::channel();
540        self.application_event_handler
541            .send(
542                ConsumerRuntimeEvent::EndOffsets {
543                    partitions,
544                    reply: reply_tx,
545                },
546                ConsumerError::ThreadStoppedBefore {
547                    operation: "end_offsets",
548                },
549            )
550            .await?;
551        reply_rx
552            .await
553            .map_err(|_| ConsumerError::ThreadStoppedDuring {
554                operation: "end_offsets",
555            })?
556    }
557
558    #[instrument(
559        name = "consumer.offsets_for_times",
560        level = "debug",
561        skip(self, partitions),
562        fields(partition_count = partitions.len())
563    )]
564    /// Offsets For Times.
565    pub async fn offsets_for_times(
566        &self,
567        partitions: Vec<TopicPartitionTimestamp>,
568    ) -> Result<Vec<TopicPartitionOffsetAndTimestamp>> {
569        let (reply_tx, reply_rx) = oneshot::channel();
570        self.application_event_handler
571            .send(
572                ConsumerRuntimeEvent::OffsetsForTimes {
573                    partitions,
574                    reply: reply_tx,
575                },
576                ConsumerError::ThreadStoppedBefore {
577                    operation: "offsets_for_times",
578                },
579            )
580            .await?;
581        reply_rx
582            .await
583            .map_err(|_| ConsumerError::ThreadStoppedDuring {
584                operation: "offsets_for_times",
585            })?
586    }
587
588    #[instrument(
589        name = "consumer.partitions_for",
590        level = "debug",
591        skip(self),
592        fields(topic = %topic)
593    )]
594    /// Partitions For.
595    pub async fn partitions_for(&self, topic: String) -> Result<Vec<TopicPartitionInfo>> {
596        let (reply_tx, reply_rx) = oneshot::channel();
597        self.application_event_handler
598            .send(
599                ConsumerRuntimeEvent::PartitionsFor {
600                    topic,
601                    reply: reply_tx,
602                },
603                ConsumerError::ThreadStoppedBefore {
604                    operation: "partitions_for",
605                },
606            )
607            .await?;
608        reply_rx
609            .await
610            .map_err(|_| ConsumerError::ThreadStoppedDuring {
611                operation: "partitions_for",
612            })?
613    }
614
615    #[instrument(name = "consumer.list_topics", level = "debug", skip(self))]
616    /// List Topics.
617    pub async fn list_topics(&self) -> Result<Vec<String>> {
618        let (reply_tx, reply_rx) = oneshot::channel();
619        self.application_event_handler
620            .send(
621                ConsumerRuntimeEvent::ListTopics { reply: reply_tx },
622                ConsumerError::ThreadStoppedBefore {
623                    operation: "list_topics",
624                },
625            )
626            .await?;
627        reply_rx
628            .await
629            .map_err(|_| ConsumerError::ThreadStoppedDuring {
630                operation: "list_topics",
631            })?
632    }
633
634    /// Commit.
635    pub async fn commit(&self, records: &ConsumerRecords) -> Result<()> {
636        self.commit_offsets(records.commit_offsets()).await
637    }
638
639    #[instrument(
640        name = "consumer.commit_offsets",
641        level = "debug",
642        skip(self, offsets),
643        fields(offset_count = offsets.len())
644    )]
645    /// Commit Offsets.
646    pub async fn commit_offsets(&self, offsets: Vec<CommitOffset>) -> Result<()> {
647        if offsets.is_empty() {
648            return Ok(());
649        }
650
651        let (reply_tx, reply_rx) = oneshot::channel();
652        self.application_event_handler
653            .send(
654                ConsumerRuntimeEvent::Commit {
655                    offsets,
656                    reply: reply_tx,
657                },
658                ConsumerError::ThreadStoppedBefore {
659                    operation: "commit",
660                },
661            )
662            .await?;
663        reply_rx
664            .await
665            .map_err(|_| ConsumerError::ThreadStoppedDuring {
666                operation: "commit",
667            })?
668    }
669
670    #[instrument(name = "consumer.wakeup", level = "debug", skip(self))]
671    /// Wakeup.
672    pub async fn wakeup(&self) -> Result<()> {
673        self.application_event_handler
674            .send(
675                ConsumerRuntimeEvent::Wakeup,
676                ConsumerError::ThreadStoppedBefore {
677                    operation: "wakeup",
678                },
679            )
680            .await
681    }
682
683    #[instrument(name = "consumer.shutdown", level = "debug", skip(self))]
684    /// Shuts the client down and waits for in-flight work to finish.
685    pub async fn shutdown(self) -> Result<()> {
686        let (reply_tx, reply_rx) = oneshot::channel();
687        self.application_event_handler
688            .send(
689                ConsumerRuntimeEvent::Shutdown { reply: reply_tx },
690                ConsumerError::ThreadStoppedBefore {
691                    operation: "shutdown",
692                },
693            )
694            .await?;
695
696        let result = reply_rx
697            .await
698            .map_err(|_| ConsumerError::ThreadStoppedDuring {
699                operation: "shutdown",
700            })?;
701        self.join.await.map_err(ConsumerError::Join)?;
702        result
703    }
704
705    #[instrument(name = "consumer.warm_up", level = "trace", skip(self))]
706    async fn warm_up(&self) -> Result<()> {
707        let (reply_tx, reply_rx) = oneshot::channel();
708        self.application_event_handler
709            .send(
710                ConsumerRuntimeEvent::WarmUp { reply: reply_tx },
711                ConsumerError::ThreadStoppedBefore {
712                    operation: "startup",
713                },
714            )
715            .await?;
716        reply_rx
717            .await
718            .map_err(|_| ConsumerError::ThreadStoppedDuring {
719                operation: "startup",
720            })?
721    }
722}
723
724struct ConsumerApplicationEventHandler {
725    tx: mpsc::Sender<ConsumerRuntimeEvent>,
726}
727
728impl ConsumerApplicationEventHandler {
729    fn new(tx: mpsc::Sender<ConsumerRuntimeEvent>) -> Self {
730        Self { tx }
731    }
732
733    async fn send(&self, event: ConsumerRuntimeEvent, stopped_error: ConsumerError) -> Result<()> {
734        self.tx.send(event).await.map_err(|_| stopped_error.into())
735    }
736}
737
738enum ConsumerRuntimeEvent {
739    WarmUp {
740        reply: oneshot::Sender<Result<()>>,
741    },
742    Subscribe {
743        topics: Vec<String>,
744        reply: oneshot::Sender<Result<()>>,
745    },
746    SubscribePattern {
747        pattern: SubscriptionPattern,
748        reply: oneshot::Sender<Result<()>>,
749    },
750    SubscribeRegex {
751        pattern: String,
752        reply: oneshot::Sender<Result<()>>,
753    },
754    Unsubscribe {
755        reply: oneshot::Sender<Result<()>>,
756    },
757    Assign {
758        partitions: Vec<TopicPartition>,
759        reply: oneshot::Sender<Result<()>>,
760    },
761    Poll {
762        timeout: Duration,
763        reply: oneshot::Sender<Result<ConsumerRecords>>,
764    },
765    Seek {
766        partition: TopicPartition,
767        offset: i64,
768        reply: oneshot::Sender<Result<()>>,
769    },
770    SeekToBeginning {
771        partitions: Vec<TopicPartition>,
772        reply: oneshot::Sender<Result<()>>,
773    },
774    SeekToEnd {
775        partitions: Vec<TopicPartition>,
776        reply: oneshot::Sender<Result<()>>,
777    },
778    SeekToTimestamp {
779        partitions: Vec<TopicPartitionTimestamp>,
780        reply: oneshot::Sender<Result<()>>,
781    },
782    Position {
783        partition: TopicPartition,
784        reply: oneshot::Sender<Result<i64>>,
785    },
786    Pause {
787        partitions: Vec<TopicPartition>,
788        reply: oneshot::Sender<Result<()>>,
789    },
790    Resume {
791        partitions: Vec<TopicPartition>,
792        reply: oneshot::Sender<Result<()>>,
793    },
794    GroupMetadata {
795        reply: oneshot::Sender<Result<ConsumerGroupMetadata>>,
796    },
797    Assignment {
798        reply: oneshot::Sender<Result<BTreeSet<TopicPartition>>>,
799    },
800    Committed {
801        partitions: Vec<TopicPartition>,
802        reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
803    },
804    BeginningOffsets {
805        partitions: Vec<TopicPartition>,
806        reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
807    },
808    EndOffsets {
809        partitions: Vec<TopicPartition>,
810        reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
811    },
812    OffsetsForTimes {
813        partitions: Vec<TopicPartitionTimestamp>,
814        reply: oneshot::Sender<Result<Vec<TopicPartitionOffsetAndTimestamp>>>,
815    },
816    PartitionsFor {
817        topic: String,
818        reply: oneshot::Sender<Result<Vec<TopicPartitionInfo>>>,
819    },
820    ListTopics {
821        reply: oneshot::Sender<Result<Vec<String>>>,
822    },
823    Commit {
824        offsets: Vec<CommitOffset>,
825        reply: oneshot::Sender<Result<()>>,
826    },
827    Wakeup,
828    Shutdown {
829        reply: oneshot::Sender<Result<()>>,
830    },
831}