kafkit-client 0.1.2

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

impl ConsumerRuntime {
    pub(crate) fn drain_runtime_events(&mut self, rx: &mut mpsc::Receiver<ConsumerRuntimeEvent>) {
        while let Ok(event) = rx.try_recv() {
            self.lifecycle.runtime_events.push_back(event);
        }
    }

    pub(crate) async fn process_runtime_events(&mut self) -> AnyResult<()> {
        while let Some(event) = self.lifecycle.runtime_events.pop_front() {
            trace!(
                event = consumer_event_name(&event),
                pending_events = self.lifecycle.runtime_events.len(),
                "processing consumer application event"
            );
            match event {
                ConsumerRuntimeEvent::WarmUp { reply } => {
                    let result = connect_to_any_bootstrap(
                        &self.config.bootstrap_servers,
                        &self.config.client_id,
                        self.config.request_timeout,
                        self.config.security_protocol,
                        &self.config.tls,
                        &self.config.sasl,
                    )
                    .await
                    .map(|_| ())
                    .map_err(Error::from);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Subscribe { topics, reply } => {
                    let result = self.subscribe(topics);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SubscribePattern { pattern, reply } => {
                    let result = self.subscribe_pattern(pattern);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SubscribeRegex { pattern, reply } => {
                    let result = self.subscribe_regex(pattern).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Unsubscribe { reply } => {
                    let result = self.assign(Vec::new()).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Assign { partitions, reply } => {
                    let result = self.assign(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Poll { timeout, reply } => {
                    if let Err(error) = self.register_poll(timeout, reply) {
                        self.fail_pending_poll(error);
                    }
                }
                ConsumerRuntimeEvent::Seek {
                    partition,
                    offset,
                    reply,
                } => {
                    let result = self.seek(partition.into(), offset);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SeekToBeginning { partitions, reply } => {
                    let result = self
                        .seek_to_timestamp(partitions, LIST_OFFSETS_EARLIEST)
                        .await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SeekToEnd { partitions, reply } => {
                    let result = self
                        .seek_to_timestamp(partitions, LIST_OFFSETS_LATEST)
                        .await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SeekToTimestamp { partitions, reply } => {
                    let result = self.seek_partitions_to_timestamp(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Position { partition, reply } => {
                    let result = self.position(&partition.into());
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Pause { partitions, reply } => {
                    let result = self.pause(partitions);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Resume { partitions, reply } => {
                    let result = self.resume(partitions);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::GroupMetadata { reply } => {
                    let _ = reply.send(Ok(self.current_group_metadata()));
                }
                ConsumerRuntimeEvent::Assignment { reply } => {
                    let _ = reply.send(Ok(self.current_assignment()));
                }
                ConsumerRuntimeEvent::Committed { partitions, reply } => {
                    let result = self.committed_offsets(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::BeginningOffsets { partitions, reply } => {
                    let result = self.beginning_offsets(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::EndOffsets { partitions, reply } => {
                    let result = self.end_offsets(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::OffsetsForTimes { partitions, reply } => {
                    let result = self.offsets_for_times(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::PartitionsFor { topic, reply } => {
                    let result = self.partitions_for(topic).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::ListTopics { reply } => {
                    let result = self.list_topics().await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Commit { offsets, reply } => {
                    self.lifecycle.pending_commits.push_back(PendingCommit {
                        offsets,
                        reply: Some(reply),
                        kind: CommitKind::Manual,
                    });
                }
                ConsumerRuntimeEvent::Wakeup => {
                    self.wakeup();
                }
                ConsumerRuntimeEvent::Shutdown { reply } => {
                    self.lifecycle.shutting_down = true;
                    self.lifecycle.close_reply = Some(reply);
                    self.fail_pending_poll(Error::Internal(anyhow!("consumer is shutting down")));
                }
            }
        }

        Ok(())
    }

    pub(crate) async fn execute_network_action(
        &mut self,
        action: ConsumerNetworkAction,
    ) -> AnyResult<()> {
        trace!(
            action = consumer_network_action_name(&action),
            "executing consumer network action"
        );
        match action {
            ConsumerNetworkAction::RefreshMetadata => self.refresh_subscription_metadata().await,
            ConsumerNetworkAction::EnsureCoordinator => self.ensure_coordinator_connection().await,
            ConsumerNetworkAction::Heartbeat => self.send_heartbeat().await,
            ConsumerNetworkAction::Commit => self.drain_commit_request().await,
            ConsumerNetworkAction::Fetch {
                leader_id,
                partitions,
            } => self.fetch_from_leader(leader_id, partitions).await,
            ConsumerNetworkAction::LeaveGroup => self.leave_group().await,
        }
    }
}