kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
use super::*;
use tokio::sync::oneshot;

impl ConsumerRuntime {
    pub(crate) fn register_poll(
        &mut self,
        timeout: Duration,
        cancellation: Option<CancellationToken>,
        reply: oneshot::Sender<ClientResult<ConsumerRecords>>,
    ) -> ClientResult<()> {
        self.heartbeat_state.last_application_poll = Instant::now();
        if cancellation
            .as_ref()
            .is_some_and(CancellationToken::is_cancelled)
        {
            let _ = reply.send(Err(Error::Cancelled));
            return Ok(());
        }
        if self.poll_state.pending_poll.is_some() {
            let _ = reply.send(Err(ConsumerError::ConcurrentPoll.into()));
            return Ok(());
        }

        if self.poll_state.wakeup_pending {
            self.poll_state.wakeup_pending = false;
            let _ = reply.send(Err(ConsumerError::Wakeup.into()));
            return Ok(());
        }

        let started_at = Instant::now();
        telemetry::record_consumer_poll_started(&self.config.client_id, &self.config.group_id);
        self.poll_state.pending_poll = Some(PendingPoll {
            started_at,
            deadline: started_at + timeout,
            cancellation,
            reply,
        });
        self.maybe_complete_poll();
        Ok(())
    }

    pub(crate) fn wakeup(&mut self) {
        if self.poll_state.pending_poll.is_some() {
            self.fail_pending_poll(ConsumerError::Wakeup.into());
            self.poll_state.wakeup_pending = false;
            return;
        }

        self.poll_state.wakeup_pending = true;
    }

    pub(crate) fn seek(&mut self, partition: TopicPartitionKey, offset: i64) -> ClientResult<()> {
        if offset < 0 {
            return Err(ConsumerError::InvalidSeekOffset { offset }.into());
        }

        let assigned = self
            .assignment_state
            .assignment
            .get_mut(&partition)
            .ok_or_else(|| ConsumerError::PartitionNotAssigned {
                operation: "seek",
                topic: partition.topic.clone(),
                partition: partition.partition,
            })?;
        assigned.fetch_offset = offset;
        self.poll_state
            .delivered_offsets
            .insert(partition.clone(), offset);
        self.discard_buffered_partition(&partition);
        debug!(
            topic = %partition.topic,
            partition = partition.partition,
            offset,
            "updated consumer position via seek"
        );
        Ok(())
    }

    pub(crate) async fn seek_to_timestamp(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
        timestamp: i64,
    ) -> ClientResult<()> {
        let keys = self.normalize_assigned_partitions(partitions)?;
        if keys.is_empty() {
            return Ok(());
        }

        let offsets = self
            .lookup_offsets_at_timestamp(&keys, timestamp)
            .await
            .map_err(into_client_error)?;
        for key in keys {
            let offset = offsets.get(&key).copied().ok_or_else(|| {
                Error::Protocol(ProtocolError::MissingResponseData {
                    operation: "seek_to_timestamp",
                    detail: format!("missing offset result for {}:{}", key.topic, key.partition),
                })
            })?;
            self.seek(key, offset)?;
        }
        Ok(())
    }

    pub(crate) async fn seek_partitions_to_timestamp(
        &mut self,
        partitions: Vec<TopicPartitionTimestamp>,
    ) -> ClientResult<()> {
        let normalized = self.normalize_timestamp_queries(partitions)?;
        let offsets = self
            .lookup_offsets_for_timestamps(&normalized)
            .await
            .map_err(into_client_error)?;
        for partition in normalized {
            let key = TopicPartitionKey::new(partition.topic.clone(), partition.partition);
            let resolved = offsets
                .get(&key)
                .ok_or_else(|| {
                    Error::Protocol(ProtocolError::MissingResponseData {
                        operation: "seek_to_timestamp",
                        detail: format!(
                            "missing offset result for {}:{}",
                            key.topic, key.partition
                        ),
                    })
                })?
                .offset;
            self.seek(key, resolved)?;
        }
        Ok(())
    }

    pub(crate) fn position(&self, partition: &TopicPartitionKey) -> ClientResult<i64> {
        let assigned = self
            .assignment_state
            .assignment
            .get(partition)
            .ok_or_else(|| ConsumerError::PartitionNotAssigned {
                operation: "position",
                topic: partition.topic.clone(),
                partition: partition.partition,
            })?;

        if let Some(offset) = self
            .poll_state
            .buffered_records
            .iter()
            .filter(|record| {
                record.topic == partition.topic && record.partition == partition.partition
            })
            .map(|record| record.offset)
            .min()
        {
            return Ok(offset);
        }

        Ok(self
            .poll_state
            .delivered_offsets
            .get(partition)
            .copied()
            .unwrap_or(assigned.fetch_offset))
    }

    pub(crate) fn pause(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
    ) -> ClientResult<()> {
        for key in self.normalize_assigned_partitions(partitions)? {
            self.assignment_state.paused_partitions.insert(key);
        }
        debug!(
            paused = self.assignment_state.paused_partitions.len(),
            "paused consumer partitions"
        );
        Ok(())
    }

    pub(crate) fn resume(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
    ) -> ClientResult<()> {
        for key in self.normalize_assigned_partitions(partitions)? {
            self.assignment_state.paused_partitions.remove(&key);
        }
        debug!(
            paused = self.assignment_state.paused_partitions.len(),
            "resumed consumer partitions"
        );
        Ok(())
    }

    pub(crate) fn maybe_complete_poll(&mut self) {
        let Some(pending) = self.poll_state.pending_poll.take() else {
            return;
        };

        if pending
            .cancellation
            .as_ref()
            .is_some_and(CancellationToken::is_cancelled)
        {
            telemetry::record_consumer_poll_completed(
                &self.config.client_id,
                &self.config.group_id,
                pending.started_at.elapsed(),
                0,
                "cancelled",
            );
            let _ = pending.reply.send(Err(Error::Cancelled));
            return;
        }

        if !self.poll_state.buffered_records.is_empty() {
            let records = std::mem::take(&mut self.poll_state.buffered_records);
            let record_count = records.len();
            for record in &records {
                self.poll_state.delivered_offsets.insert(
                    TopicPartitionKey::new(record.topic.clone(), record.partition),
                    record.offset + 1,
                );
            }
            let _ = pending.reply.send(Ok(ConsumerRecords::new(records)));
            telemetry::record_consumer_poll_completed(
                &self.config.client_id,
                &self.config.group_id,
                pending.started_at.elapsed(),
                record_count,
                "records",
            );
            debug!(
                record_count,
                "completed consumer poll with buffered records"
            );
            return;
        }

        if Instant::now() >= pending.deadline
            || (!self.has_group_subscription() && self.desired_topics().is_empty())
        {
            let _ = pending.reply.send(Ok(ConsumerRecords::default()));
            telemetry::record_consumer_poll_completed(
                &self.config.client_id,
                &self.config.group_id,
                pending.started_at.elapsed(),
                0,
                "empty",
            );
            trace!("completed consumer poll with no records");
            return;
        }

        self.poll_state.pending_poll = Some(pending);
    }

    pub(crate) fn fail_pending_poll(&mut self, error: Error) {
        if let Some(pending) = self.poll_state.pending_poll.take() {
            telemetry::record_consumer_poll_completed(
                &self.config.client_id,
                &self.config.group_id,
                pending.started_at.elapsed(),
                0,
                "error",
            );
            let _ = pending.reply.send(Err(error));
        }
    }
}