use super::*;
impl ConsumerRuntime {
pub(crate) fn drain_runtime_events(&mut self, rx: &mut mpsc::Receiver<ConsumerRuntimeEvent>) {
while let Ok(event) = rx.try_recv() {
self.lifecycle.runtime_events.push_back(event);
}
}
pub(crate) async fn process_runtime_events(&mut self) -> AnyResult<()> {
while let Some(event) = self.lifecycle.runtime_events.pop_front() {
trace!(
event = consumer_event_name(&event),
pending_events = self.lifecycle.runtime_events.len(),
"processing consumer application event"
);
match event {
ConsumerRuntimeEvent::WarmUp { reply } => {
let result = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
)
.await
.map(|_| ())
.map_err(Error::from);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Subscribe { topics, reply } => {
let result = self.subscribe(topics);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SubscribePattern { pattern, reply } => {
let result = self.subscribe_pattern(pattern);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SubscribeRegex { pattern, reply } => {
let result = self.subscribe_regex(pattern).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Unsubscribe { reply } => {
let result = self.assign(Vec::new()).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Assign { partitions, reply } => {
let result = self.assign(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Poll { timeout, reply } => {
if let Err(error) = self.register_poll(timeout, reply) {
self.fail_pending_poll(error);
}
}
ConsumerRuntimeEvent::Seek {
partition,
offset,
reply,
} => {
let result = self.seek(partition.into(), offset);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SeekToBeginning { partitions, reply } => {
let result = self
.seek_to_timestamp(partitions, LIST_OFFSETS_EARLIEST)
.await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SeekToEnd { partitions, reply } => {
let result = self
.seek_to_timestamp(partitions, LIST_OFFSETS_LATEST)
.await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::SeekToTimestamp { partitions, reply } => {
let result = self.seek_partitions_to_timestamp(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Position { partition, reply } => {
let result = self.position(&partition.into());
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Pause { partitions, reply } => {
let result = self.pause(partitions);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Resume { partitions, reply } => {
let result = self.resume(partitions);
let _ = reply.send(result);
}
ConsumerRuntimeEvent::GroupMetadata { reply } => {
let _ = reply.send(Ok(self.current_group_metadata()));
}
ConsumerRuntimeEvent::Assignment { reply } => {
let _ = reply.send(Ok(self.current_assignment()));
}
ConsumerRuntimeEvent::Committed { partitions, reply } => {
let result = self.committed_offsets(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::BeginningOffsets { partitions, reply } => {
let result = self.beginning_offsets(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::EndOffsets { partitions, reply } => {
let result = self.end_offsets(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::OffsetsForTimes { partitions, reply } => {
let result = self.offsets_for_times(partitions).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::PartitionsFor { topic, reply } => {
let result = self.partitions_for(topic).await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::ListTopics { reply } => {
let result = self.list_topics().await;
let _ = reply.send(result);
}
ConsumerRuntimeEvent::Commit { offsets, reply } => {
self.lifecycle.pending_commits.push_back(PendingCommit {
offsets,
reply: Some(reply),
kind: CommitKind::Manual,
});
}
ConsumerRuntimeEvent::Wakeup => {
self.wakeup();
}
ConsumerRuntimeEvent::Shutdown { reply } => {
self.lifecycle.shutting_down = true;
self.lifecycle.close_reply = Some(reply);
self.fail_pending_poll(Error::Internal(anyhow!("consumer is shutting down")));
}
}
}
Ok(())
}
pub(crate) async fn execute_network_action(
&mut self,
action: ConsumerNetworkAction,
) -> AnyResult<()> {
trace!(
action = consumer_network_action_name(&action),
"executing consumer network action"
);
match action {
ConsumerNetworkAction::RefreshMetadata => self.refresh_subscription_metadata().await,
ConsumerNetworkAction::EnsureCoordinator => self.ensure_coordinator_connection().await,
ConsumerNetworkAction::Heartbeat => self.send_heartbeat().await,
ConsumerNetworkAction::Commit => self.drain_commit_request().await,
ConsumerNetworkAction::Fetch {
leader_id,
partitions,
} => self.fetch_from_leader(leader_id, partitions).await,
ConsumerNetworkAction::LeaveGroup => self.leave_group().await,
}
}
}