use super::*;
impl ConsumerRuntime {
pub(crate) fn drain_runtime_events(&mut self, rx: &mut mpsc::Receiver<ConsumerRuntimeEvent>) {
while let Ok(event) = rx.try_recv() {
self.lifecycle.runtime_events.push_back(event);
}
}
pub(crate) async fn process_runtime_events(&mut self) -> AnyResult<()> {
while let Some(event) = self.lifecycle.runtime_events.pop_front() {
if event.is_cancelled() {
event.send_cancelled();
continue;
}
if let Some(message) = self.lifecycle.fatal_error.clone() {
match event {
ConsumerRuntimeEvent::Wakeup => self.wakeup(),
ConsumerRuntimeEvent::Shutdown { reply } => {
self.lifecycle.shutting_down = true;
self.lifecycle.close_reply = Some(reply);
self.fail_pending_poll(Error::Consumer(ConsumerError::ShuttingDown));
}
event => event.send_failed(&message),
}
continue;
}
trace!(
event = consumer_event_name(&event),
pending_events = self.lifecycle.runtime_events.len(),
"processing consumer application event"
);
match event {
ConsumerRuntimeEvent::WarmUp { reply, .. } => {
let result = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await
.map(|_| ())
.map_err(Error::from);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Subscribe { topics, reply, .. } => {
let result = self.subscribe(topics);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SubscribePattern { pattern, reply, .. } => {
let result = self.subscribe_pattern(pattern);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SubscribeRegex { pattern, reply, .. } => {
let result = self.subscribe_regex(pattern).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Unsubscribe { reply, .. } => {
let result = self.assign(Vec::new()).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Assign {
partitions, reply, ..
} => {
let result = self.assign(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Poll {
timeout,
cancellation,
reply,
} => {
if let Err(error) = self.register_poll(timeout, cancellation, reply) {
self.fail_pending_poll(error);
}
}
ConsumerRuntimeEvent::Seek {
partition,
offset,
reply,
..
} => {
let result = self.seek(partition.into(), offset);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SeekToBeginning {
partitions, reply, ..
} => {
let result = self
.seek_to_timestamp(partitions, LIST_OFFSETS_EARLIEST)
.await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SeekToEnd {
partitions, reply, ..
} => {
let result = self
.seek_to_timestamp(partitions, LIST_OFFSETS_LATEST)
.await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SeekToTimestamp {
partitions, reply, ..
} => {
let result = self.seek_partitions_to_timestamp(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Position {
partition, reply, ..
} => {
let result = self.position(&partition.into());
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Pause {
partitions, reply, ..
} => {
let result = self.pause(partitions);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Resume {
partitions, reply, ..
} => {
let result = self.resume(partitions);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::GroupMetadata { reply, .. } => {
let _ = reply.send(Ok(self.current_group_metadata()));
}
ConsumerRuntimeEvent::Assignment { reply, .. } => {
let _ = reply.send(Ok(self.current_assignment()));
}
ConsumerRuntimeEvent::Committed {
partitions, reply, ..
} => {
let result = self.committed_offsets(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::BeginningOffsets {
partitions, reply, ..
} => {
let result = self.beginning_offsets(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::EndOffsets {
partitions, reply, ..
} => {
let result = self.end_offsets(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::OffsetsForTimes {
partitions, reply, ..
} => {
let result = self.offsets_for_times(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::PartitionsFor { topic, reply, .. } => {
let result = self.partitions_for(topic).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::ListTopics { reply, .. } => {
let result = self.list_topics().await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Commit {
offsets,
cancellation,
reply,
} => {
self.lifecycle.pending_commits.push_back(PendingCommit {
offsets,
cancellation,
reply: Some(reply),
kind: CommitKind::Manual,
});
}
ConsumerRuntimeEvent::Wakeup => {
self.wakeup();
}
ConsumerRuntimeEvent::Shutdown { reply } => {
self.lifecycle.shutting_down = true;
self.lifecycle.close_reply = Some(reply);
self.fail_pending_poll(Error::Consumer(ConsumerError::ShuttingDown));
}
}
}
Ok(())
}
pub(crate) async fn execute_network_action(
&mut self,
action: ConsumerNetworkAction,
) -> AnyResult<()> {
trace!(
action = consumer_network_action_name(&action),
"executing consumer network action"
);
match action {
ConsumerNetworkAction::RefreshMetadata => self.refresh_subscription_metadata().await,
ConsumerNetworkAction::EnsureCoordinator => self.ensure_coordinator_connection().await,
ConsumerNetworkAction::Heartbeat => self.send_heartbeat().await,
ConsumerNetworkAction::Commit => self.drain_commit_request().await,
ConsumerNetworkAction::Fetch {
leader_id,
partitions,
} => self.fetch_from_leader(leader_id, partitions).await,
ConsumerNetworkAction::LeaveGroup => self.leave_group().await,
}
}
}
#[cfg(test)]
mod cancellation_tests {
use super::*;
use crate::CancellationToken;
use crate::config::ConsumerConfig;
use tokio::sync::oneshot;
#[tokio::test]
async fn cancelled_list_topics_event_is_abandoned_before_network_work() {
let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
let cancellation = CancellationToken::new();
cancellation.cancel();
let (reply, result) = oneshot::channel();
runtime
.lifecycle
.runtime_events
.push_back(ConsumerRuntimeEvent::ListTopics {
cancellation: Some(cancellation),
reply,
});
runtime.process_runtime_events().await.unwrap();
assert!(runtime.lifecycle.runtime_events.is_empty());
assert!(matches!(result.await, Ok(Err(Error::Cancelled))));
}
#[tokio::test]
async fn cancelled_poll_event_does_not_register_pending_poll() {
let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
let cancellation = CancellationToken::new();
cancellation.cancel();
let (reply, result) = oneshot::channel();
runtime
.lifecycle
.runtime_events
.push_back(ConsumerRuntimeEvent::Poll {
timeout: Duration::from_secs(30),
cancellation: Some(cancellation),
reply,
});
runtime.process_runtime_events().await.unwrap();
assert!(runtime.poll_state.pending_poll.is_none());
assert!(matches!(result.await, Ok(Err(Error::Cancelled))));
}
}