kafkit-client 0.1.4

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

impl ConsumerRuntime {
    pub(crate) fn drain_runtime_events(&mut self, rx: &mut mpsc::Receiver<ConsumerRuntimeEvent>) {
        while let Ok(event) = rx.try_recv() {
            self.lifecycle.runtime_events.push_back(event);
        }
    }

    pub(crate) async fn process_runtime_events(&mut self) -> AnyResult<()> {
        while let Some(event) = self.lifecycle.runtime_events.pop_front() {
            if event.is_cancelled() {
                event.send_cancelled();
                continue;
            }
            trace!(
                event = consumer_event_name(&event),
                pending_events = self.lifecycle.runtime_events.len(),
                "processing consumer application event"
            );
            match event {
                ConsumerRuntimeEvent::WarmUp { reply, .. } => {
                    let result = connect_to_any_bootstrap(
                        &self.config.bootstrap_servers,
                        &self.config.client_id,
                        self.config.request_timeout,
                        self.config.security_protocol,
                        &self.config.tls,
                        &self.config.sasl,
                        &self.config.tcp_connector,
                    )
                    .await
                    .map(|_| ())
                    .map_err(Error::from);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Subscribe { topics, reply, .. } => {
                    let result = self.subscribe(topics);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SubscribePattern { pattern, reply, .. } => {
                    let result = self.subscribe_pattern(pattern);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SubscribeRegex { pattern, reply, .. } => {
                    let result = self.subscribe_regex(pattern).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Unsubscribe { reply, .. } => {
                    let result = self.assign(Vec::new()).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Assign {
                    partitions, reply, ..
                } => {
                    let result = self.assign(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Poll {
                    timeout,
                    cancellation,
                    reply,
                } => {
                    if let Err(error) = self.register_poll(timeout, cancellation, reply) {
                        self.fail_pending_poll(error);
                    }
                }
                ConsumerRuntimeEvent::Seek {
                    partition,
                    offset,
                    reply,
                    ..
                } => {
                    let result = self.seek(partition.into(), offset);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SeekToBeginning {
                    partitions, reply, ..
                } => {
                    let result = self
                        .seek_to_timestamp(partitions, LIST_OFFSETS_EARLIEST)
                        .await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SeekToEnd {
                    partitions, reply, ..
                } => {
                    let result = self
                        .seek_to_timestamp(partitions, LIST_OFFSETS_LATEST)
                        .await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::SeekToTimestamp {
                    partitions, reply, ..
                } => {
                    let result = self.seek_partitions_to_timestamp(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Position {
                    partition, reply, ..
                } => {
                    let result = self.position(&partition.into());
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Pause {
                    partitions, reply, ..
                } => {
                    let result = self.pause(partitions);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Resume {
                    partitions, reply, ..
                } => {
                    let result = self.resume(partitions);
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::GroupMetadata { reply, .. } => {
                    let _ = reply.send(Ok(self.current_group_metadata()));
                }
                ConsumerRuntimeEvent::Assignment { reply, .. } => {
                    let _ = reply.send(Ok(self.current_assignment()));
                }
                ConsumerRuntimeEvent::Committed {
                    partitions, reply, ..
                } => {
                    let result = self.committed_offsets(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::BeginningOffsets {
                    partitions, reply, ..
                } => {
                    let result = self.beginning_offsets(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::EndOffsets {
                    partitions, reply, ..
                } => {
                    let result = self.end_offsets(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::OffsetsForTimes {
                    partitions, reply, ..
                } => {
                    let result = self.offsets_for_times(partitions).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::PartitionsFor { topic, reply, .. } => {
                    let result = self.partitions_for(topic).await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::ListTopics { reply, .. } => {
                    let result = self.list_topics().await;
                    let _ = reply.send(result);
                }
                ConsumerRuntimeEvent::Commit {
                    offsets,
                    cancellation,
                    reply,
                } => {
                    self.lifecycle.pending_commits.push_back(PendingCommit {
                        offsets,
                        cancellation,
                        reply: Some(reply),
                        kind: CommitKind::Manual,
                    });
                }
                ConsumerRuntimeEvent::Wakeup => {
                    self.wakeup();
                }
                ConsumerRuntimeEvent::Shutdown { reply } => {
                    self.lifecycle.shutting_down = true;
                    self.lifecycle.close_reply = Some(reply);
                    self.fail_pending_poll(Error::Internal(anyhow!("consumer is shutting down")));
                }
            }
        }

        Ok(())
    }

    pub(crate) async fn execute_network_action(
        &mut self,
        action: ConsumerNetworkAction,
    ) -> AnyResult<()> {
        trace!(
            action = consumer_network_action_name(&action),
            "executing consumer network action"
        );
        match action {
            ConsumerNetworkAction::RefreshMetadata => self.refresh_subscription_metadata().await,
            ConsumerNetworkAction::EnsureCoordinator => self.ensure_coordinator_connection().await,
            ConsumerNetworkAction::Heartbeat => self.send_heartbeat().await,
            ConsumerNetworkAction::Commit => self.drain_commit_request().await,
            ConsumerNetworkAction::Fetch {
                leader_id,
                partitions,
            } => self.fetch_from_leader(leader_id, partitions).await,
            ConsumerNetworkAction::LeaveGroup => self.leave_group().await,
        }
    }
}

#[cfg(test)]
mod cancellation_tests {
    use super::*;
    use crate::CancellationToken;
    use crate::config::ConsumerConfig;
    use tokio::sync::oneshot;

    #[tokio::test]
    async fn cancelled_list_topics_event_is_abandoned_before_network_work() {
        let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
        let cancellation = CancellationToken::new();
        cancellation.cancel();
        let (reply, result) = oneshot::channel();

        runtime
            .lifecycle
            .runtime_events
            .push_back(ConsumerRuntimeEvent::ListTopics {
                cancellation: Some(cancellation),
                reply,
            });

        runtime.process_runtime_events().await.unwrap();

        assert!(runtime.lifecycle.runtime_events.is_empty());
        assert!(matches!(result.await, Ok(Err(Error::Cancelled))));
    }

    #[tokio::test]
    async fn cancelled_poll_event_does_not_register_pending_poll() {
        let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
        let cancellation = CancellationToken::new();
        cancellation.cancel();
        let (reply, result) = oneshot::channel();

        runtime
            .lifecycle
            .runtime_events
            .push_back(ConsumerRuntimeEvent::Poll {
                timeout: Duration::from_secs(30),
                cancellation: Some(cancellation),
                reply,
            });

        runtime.process_runtime_events().await.unwrap();

        assert!(runtime.poll_state.pending_poll.is_none());
        assert!(matches!(result.await, Ok(Err(Error::Cancelled))));
    }
}