mod protocol;
mod runtime;
mod scheduler;
mod share;
mod state;
mod util;
use std::collections::BTreeSet;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, instrument};
use crate::config::ConsumerConfig;
use crate::types::{
CommitOffset, ConsumerGroupMetadata, ConsumerRecords, SubscriptionPattern, TopicPartition,
TopicPartitionInfo, TopicPartitionOffset, TopicPartitionOffsetAndTimestamp,
TopicPartitionTimestamp,
};
use crate::{CancellationToken, ConsumerError, Result};
use runtime::ConsumerRuntime;
pub struct KafkaConsumer {
consumer_runtime: ConsumerRuntimeHandle,
join: JoinHandle<()>,
default_poll_timeout: Duration,
}
pub use share::{
AcknowledgeType, AcknowledgementCommitCallback, KafkaShareConsumer, ShareAcknowledgementCommit,
ShareAcquireMode, ShareConsumerOptions, ShareRecord, ShareRecords,
};
impl KafkaConsumer {
#[instrument(
name = "consumer.connect",
level = "debug",
skip(config),
fields(
bootstrap_server_count = config.bootstrap_servers.len(),
client_id = %config.client_id,
group_id = %config.group_id
)
)]
pub async fn connect(config: ConsumerConfig) -> Result<Self> {
let (tx, rx) = mpsc::channel(64);
let runtime = ConsumerRuntime::new(config);
let join = tokio::spawn(async move {
runtime.run(rx).await;
});
let consumer = Self {
consumer_runtime: ConsumerRuntimeHandle::new(tx),
join,
default_poll_timeout: Duration::from_millis(100),
};
if let Err(error) = consumer.warm_up().await {
consumer.join.abort();
return Err(error);
}
debug!("consumer connected");
Ok(consumer)
}
pub fn with_default_poll_timeout(mut self, timeout: Duration) -> Self {
self.default_poll_timeout = timeout;
self
}
#[instrument(
name = "consumer.subscribe",
level = "debug",
skip(self, topics),
fields(topic_count = topics.len())
)]
pub async fn subscribe(&self, topics: Vec<String>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Subscribe {
topics,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "subscribe",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "subscribe",
})?
}
#[instrument(
name = "consumer.subscribe_pattern",
level = "debug",
skip(self, pattern),
fields(pattern = %pattern.pattern())
)]
pub async fn subscribe_pattern(&self, pattern: SubscriptionPattern) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::SubscribePattern {
pattern,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "subscribe_pattern",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "subscribe_pattern",
})?
}
#[instrument(
name = "consumer.subscribe_regex",
level = "debug",
skip(self, pattern),
fields(pattern = %pattern)
)]
pub async fn subscribe_regex(&self, pattern: String) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::SubscribeRegex {
pattern,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "subscribe_regex",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "subscribe_regex",
})?
}
#[instrument(name = "consumer.unsubscribe", level = "debug", skip(self))]
pub async fn unsubscribe(&self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Unsubscribe {
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "unsubscribe",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "unsubscribe",
})?
}
pub async fn poll(&self) -> Result<ConsumerRecords> {
self.poll_for(self.default_poll_timeout).await
}
#[instrument(
name = "consumer.poll",
level = "debug",
skip(self),
fields(timeout_ms = timeout.as_millis())
)]
pub async fn poll_for(&self, timeout: Duration) -> Result<ConsumerRecords> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Poll {
timeout,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore { operation: "poll" },
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "poll" })?
}
#[instrument(
name = "consumer.assign",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn assign(&self, partitions: Vec<TopicPartition>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Assign {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "assign",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "assign",
})?
}
#[instrument(
name = "consumer.seek",
level = "debug",
skip(self, partition),
fields(topic = %partition.topic, partition_id = partition.partition, offset)
)]
pub async fn seek(&self, partition: TopicPartition, offset: i64) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Seek {
partition,
offset,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore { operation: "seek" },
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "seek" })?
}
#[instrument(
name = "consumer.seek_to_beginning",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn seek_to_beginning(&self, partitions: Vec<TopicPartition>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::SeekToBeginning {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "seek_to_beginning",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "seek_to_beginning",
})?
}
#[instrument(
name = "consumer.seek_to_end",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn seek_to_end(&self, partitions: Vec<TopicPartition>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::SeekToEnd {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "seek_to_end",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "seek_to_end",
})?
}
#[instrument(
name = "consumer.seek_to_timestamp",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn seek_to_timestamp(&self, partitions: Vec<TopicPartitionTimestamp>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::SeekToTimestamp {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "seek_to_timestamp",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "seek_to_timestamp",
})?
}
#[instrument(
name = "consumer.position",
level = "debug",
skip(self, partition),
fields(topic = %partition.topic, partition_id = partition.partition)
)]
pub async fn position(&self, partition: TopicPartition) -> Result<i64> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Position {
partition,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "position",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "position",
})?
}
#[instrument(
name = "consumer.pause",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn pause(&self, partitions: Vec<TopicPartition>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Pause {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore { operation: "pause" },
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring { operation: "pause" })?
}
#[instrument(
name = "consumer.resume",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn resume(&self, partitions: Vec<TopicPartition>) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Resume {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "resume",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "resume",
})?
}
#[instrument(name = "consumer.group_metadata", level = "debug", skip(self))]
pub async fn group_metadata(&self) -> Result<ConsumerGroupMetadata> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::GroupMetadata {
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "group_metadata",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "group_metadata",
})?
}
#[instrument(name = "consumer.assignment", level = "debug", skip(self))]
pub async fn assignment(&self) -> Result<BTreeSet<TopicPartition>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Assignment {
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "assignment",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "assignment",
})?
}
#[instrument(
name = "consumer.committed_offsets",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn committed(
&self,
partitions: Vec<TopicPartition>,
) -> Result<Vec<TopicPartitionOffset>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Committed {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "committed",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "committed",
})?
}
#[instrument(
name = "consumer.beginning_offsets",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn beginning_offsets(
&self,
partitions: Vec<TopicPartition>,
) -> Result<Vec<TopicPartitionOffset>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::BeginningOffsets {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "beginning_offsets",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "beginning_offsets",
})?
}
#[instrument(
name = "consumer.end_offsets",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn end_offsets(
&self,
partitions: Vec<TopicPartition>,
) -> Result<Vec<TopicPartitionOffset>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::EndOffsets {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "end_offsets",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "end_offsets",
})?
}
#[instrument(
name = "consumer.offsets_for_times",
level = "debug",
skip(self, partitions),
fields(partition_count = partitions.len())
)]
pub async fn offsets_for_times(
&self,
partitions: Vec<TopicPartitionTimestamp>,
) -> Result<Vec<TopicPartitionOffsetAndTimestamp>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::OffsetsForTimes {
partitions,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "offsets_for_times",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "offsets_for_times",
})?
}
#[instrument(
name = "consumer.partitions_for",
level = "debug",
skip(self),
fields(topic = %topic)
)]
pub async fn partitions_for(&self, topic: String) -> Result<Vec<TopicPartitionInfo>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::PartitionsFor {
topic,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "partitions_for",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "partitions_for",
})?
}
#[instrument(name = "consumer.list_topics", level = "debug", skip(self))]
pub async fn list_topics(&self) -> Result<Vec<String>> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::ListTopics {
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "list_topics",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "list_topics",
})?
}
pub async fn commit(&self, records: &ConsumerRecords) -> Result<()> {
self.commit_offsets(records.commit_offsets()).await
}
#[instrument(
name = "consumer.commit_offsets",
level = "debug",
skip(self, offsets),
fields(offset_count = offsets.len())
)]
pub async fn commit_offsets(&self, offsets: Vec<CommitOffset>) -> Result<()> {
if offsets.is_empty() {
return Ok(());
}
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Commit {
offsets,
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "commit",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "commit",
})?
}
#[instrument(name = "consumer.wakeup", level = "debug", skip(self))]
pub async fn wakeup(&self) -> Result<()> {
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Wakeup,
ConsumerError::ThreadStoppedBefore {
operation: "wakeup",
},
)
.await
}
#[instrument(name = "consumer.shutdown", level = "debug", skip(self))]
pub async fn shutdown(self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::Shutdown { reply: reply_tx },
ConsumerError::ThreadStoppedBefore {
operation: "shutdown",
},
)
.await?;
let result = reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "shutdown",
})?;
self.join.await.map_err(ConsumerError::Join)?;
result
}
#[instrument(name = "consumer.warm_up", level = "trace", skip(self))]
async fn warm_up(&self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.consumer_runtime
.send(
ConsumerRuntimeEvent::WarmUp {
cancellation: None,
reply: reply_tx,
},
ConsumerError::ThreadStoppedBefore {
operation: "startup",
},
)
.await?;
reply_rx
.await
.map_err(|_| ConsumerError::ThreadStoppedDuring {
operation: "startup",
})?
}
}
struct ConsumerRuntimeHandle {
tx: mpsc::Sender<ConsumerRuntimeEvent>,
}
impl ConsumerRuntimeHandle {
fn new(tx: mpsc::Sender<ConsumerRuntimeEvent>) -> Self {
Self { tx }
}
async fn send(&self, event: ConsumerRuntimeEvent, stopped_error: ConsumerError) -> Result<()> {
self.tx.send(event).await.map_err(|_| stopped_error.into())
}
}
enum ConsumerRuntimeEvent {
WarmUp {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
topics: Vec<String>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
SubscribePattern {
pattern: SubscriptionPattern,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
SubscribeRegex {
pattern: String,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Unsubscribe {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Assign {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Poll {
timeout: Duration,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<ConsumerRecords>>,
},
Seek {
partition: TopicPartition,
offset: i64,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
SeekToBeginning {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
SeekToEnd {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
SeekToTimestamp {
partitions: Vec<TopicPartitionTimestamp>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Position {
partition: TopicPartition,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<i64>>,
},
Pause {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Resume {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
GroupMetadata {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<ConsumerGroupMetadata>>,
},
Assignment {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<BTreeSet<TopicPartition>>>,
},
Committed {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
},
BeginningOffsets {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
},
EndOffsets {
partitions: Vec<TopicPartition>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<Vec<TopicPartitionOffset>>>,
},
OffsetsForTimes {
partitions: Vec<TopicPartitionTimestamp>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<Vec<TopicPartitionOffsetAndTimestamp>>>,
},
PartitionsFor {
topic: String,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<Vec<TopicPartitionInfo>>>,
},
ListTopics {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<Vec<String>>>,
},
Commit {
offsets: Vec<CommitOffset>,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Wakeup,
Shutdown {
reply: oneshot::Sender<Result<()>>,
},
}
impl ConsumerRuntimeEvent {
fn is_cancelled(&self) -> bool {
match self {
Self::WarmUp { cancellation, .. }
| Self::Subscribe { cancellation, .. }
| Self::SubscribePattern { cancellation, .. }
| Self::SubscribeRegex { cancellation, .. }
| Self::Unsubscribe { cancellation, .. }
| Self::Assign { cancellation, .. }
| Self::Poll { cancellation, .. }
| Self::Seek { cancellation, .. }
| Self::SeekToBeginning { cancellation, .. }
| Self::SeekToEnd { cancellation, .. }
| Self::SeekToTimestamp { cancellation, .. }
| Self::Position { cancellation, .. }
| Self::Pause { cancellation, .. }
| Self::Resume { cancellation, .. }
| Self::GroupMetadata { cancellation, .. }
| Self::Assignment { cancellation, .. }
| Self::Committed { cancellation, .. }
| Self::BeginningOffsets { cancellation, .. }
| Self::EndOffsets { cancellation, .. }
| Self::OffsetsForTimes { cancellation, .. }
| Self::PartitionsFor { cancellation, .. }
| Self::ListTopics { cancellation, .. }
| Self::Commit { cancellation, .. } => cancellation
.as_ref()
.is_some_and(CancellationToken::is_cancelled),
Self::Wakeup | Self::Shutdown { .. } => false,
}
}
fn send_cancelled(self) {
match self {
Self::WarmUp { reply, .. }
| Self::Subscribe { reply, .. }
| Self::SubscribePattern { reply, .. }
| Self::SubscribeRegex { reply, .. }
| Self::Unsubscribe { reply, .. }
| Self::Assign { reply, .. }
| Self::Seek { reply, .. }
| Self::SeekToBeginning { reply, .. }
| Self::SeekToEnd { reply, .. }
| Self::SeekToTimestamp { reply, .. }
| Self::Pause { reply, .. }
| Self::Resume { reply, .. }
| Self::Commit { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::Poll { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::Position { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::GroupMetadata { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::Assignment { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::Committed { reply, .. }
| Self::BeginningOffsets { reply, .. }
| Self::EndOffsets { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::OffsetsForTimes { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::PartitionsFor { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::ListTopics { reply, .. } => {
let _ = reply.send(Err(crate::Error::Cancelled));
}
Self::Wakeup | Self::Shutdown { .. } => {}
}
}
fn send_failed(self, message: &str) {
let error = || {
crate::Error::Consumer(crate::ConsumerError::Fatal {
message: message.to_owned(),
})
};
match self {
Self::WarmUp { reply, .. }
| Self::Subscribe { reply, .. }
| Self::SubscribePattern { reply, .. }
| Self::SubscribeRegex { reply, .. }
| Self::Unsubscribe { reply, .. }
| Self::Assign { reply, .. }
| Self::Seek { reply, .. }
| Self::SeekToBeginning { reply, .. }
| Self::SeekToEnd { reply, .. }
| Self::SeekToTimestamp { reply, .. }
| Self::Pause { reply, .. }
| Self::Resume { reply, .. }
| Self::Commit { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::Poll { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::Position { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::GroupMetadata { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::Assignment { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::Committed { reply, .. }
| Self::BeginningOffsets { reply, .. }
| Self::EndOffsets { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::OffsetsForTimes { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::PartitionsFor { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::ListTopics { reply, .. } => {
let _ = reply.send(Err(error()));
}
Self::Wakeup | Self::Shutdown { .. } => {}
}
}
}