kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation
use super::*;
use tokio::sync::oneshot;

impl ConsumerRuntime {
    pub(crate) fn register_poll(
        &mut self,
        timeout: Duration,
        cancellation: Option<CancellationToken>,
        reply: oneshot::Sender<ClientResult<ConsumerRecords>>,
    ) -> ClientResult<()> {
        self.heartbeat_state.last_application_poll = Instant::now();
        if cancellation
            .as_ref()
            .is_some_and(CancellationToken::is_cancelled)
        {
            let _ = reply.send(Err(Error::Cancelled));
            return Ok(());
        }
        if self.poll_state.pending_poll.is_some() {
            let _ = reply.send(Err(ConsumerError::ConcurrentPoll.into()));
            return Ok(());
        }

        if self.poll_state.wakeup_pending {
            self.poll_state.wakeup_pending = false;
            let _ = reply.send(Err(ConsumerError::Wakeup.into()));
            return Ok(());
        }

        self.poll_state.pending_poll = Some(PendingPoll {
            deadline: Instant::now() + timeout,
            cancellation,
            reply,
        });
        self.maybe_complete_poll();
        Ok(())
    }

    pub(crate) fn wakeup(&mut self) {
        if self.poll_state.pending_poll.is_some() {
            self.fail_pending_poll(ConsumerError::Wakeup.into());
            self.poll_state.wakeup_pending = false;
            return;
        }

        self.poll_state.wakeup_pending = true;
    }

    pub(crate) fn seek(&mut self, partition: TopicPartitionKey, offset: i64) -> ClientResult<()> {
        if offset < 0 {
            return Err(ConsumerError::InvalidSeekOffset { offset }.into());
        }

        let assigned = self
            .assignment_state
            .assignment
            .get_mut(&partition)
            .ok_or_else(|| ConsumerError::PartitionNotAssigned {
                operation: "seek",
                topic: partition.topic.clone(),
                partition: partition.partition,
            })?;
        assigned.fetch_offset = offset;
        self.poll_state
            .delivered_offsets
            .insert(partition.clone(), offset);
        self.discard_buffered_partition(&partition);
        debug!(
            topic = %partition.topic,
            partition = partition.partition,
            offset,
            "updated consumer position via seek"
        );
        Ok(())
    }

    pub(crate) async fn seek_to_timestamp(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
        timestamp: i64,
    ) -> ClientResult<()> {
        let keys = self.normalize_assigned_partitions(partitions)?;
        if keys.is_empty() {
            return Ok(());
        }

        let offsets = self.lookup_offsets_at_timestamp(&keys, timestamp).await?;
        for key in keys {
            let offset = offsets.get(&key).copied().ok_or_else(|| {
                Error::Internal(anyhow!(
                    "missing offset result for {}:{} during seek",
                    key.topic,
                    key.partition
                ))
            })?;
            self.seek(key, offset)?;
        }
        Ok(())
    }

    pub(crate) async fn seek_partitions_to_timestamp(
        &mut self,
        partitions: Vec<TopicPartitionTimestamp>,
    ) -> ClientResult<()> {
        let normalized = self.normalize_timestamp_queries(partitions)?;
        let offsets = self.lookup_offsets_for_timestamps(&normalized).await?;
        for partition in normalized {
            let key = TopicPartitionKey::new(partition.topic.clone(), partition.partition);
            let resolved = offsets
                .get(&key)
                .ok_or_else(|| {
                    Error::Internal(anyhow!(
                        "missing offset result for {}:{} during seek",
                        key.topic,
                        key.partition
                    ))
                })?
                .offset;
            self.seek(key, resolved)?;
        }
        Ok(())
    }

    pub(crate) fn position(&self, partition: &TopicPartitionKey) -> ClientResult<i64> {
        let assigned = self
            .assignment_state
            .assignment
            .get(partition)
            .ok_or_else(|| ConsumerError::PartitionNotAssigned {
                operation: "position",
                topic: partition.topic.clone(),
                partition: partition.partition,
            })?;

        if let Some(offset) = self
            .poll_state
            .buffered_records
            .iter()
            .filter(|record| {
                record.topic == partition.topic && record.partition == partition.partition
            })
            .map(|record| record.offset)
            .min()
        {
            return Ok(offset);
        }

        Ok(self
            .poll_state
            .delivered_offsets
            .get(partition)
            .copied()
            .unwrap_or(assigned.fetch_offset))
    }

    pub(crate) fn pause(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
    ) -> ClientResult<()> {
        for key in self.normalize_assigned_partitions(partitions)? {
            self.assignment_state.paused_partitions.insert(key);
        }
        debug!(
            paused = self.assignment_state.paused_partitions.len(),
            "paused consumer partitions"
        );
        Ok(())
    }

    pub(crate) fn resume(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
    ) -> ClientResult<()> {
        for key in self.normalize_assigned_partitions(partitions)? {
            self.assignment_state.paused_partitions.remove(&key);
        }
        debug!(
            paused = self.assignment_state.paused_partitions.len(),
            "resumed consumer partitions"
        );
        Ok(())
    }

    pub(crate) fn maybe_complete_poll(&mut self) {
        let Some(pending) = self.poll_state.pending_poll.take() else {
            return;
        };

        if pending
            .cancellation
            .as_ref()
            .is_some_and(CancellationToken::is_cancelled)
        {
            let _ = pending.reply.send(Err(Error::Cancelled));
            return;
        }

        if !self.poll_state.buffered_records.is_empty() {
            let records = std::mem::take(&mut self.poll_state.buffered_records);
            let record_count = records.len();
            for record in &records {
                self.poll_state.delivered_offsets.insert(
                    TopicPartitionKey::new(record.topic.clone(), record.partition),
                    record.offset + 1,
                );
            }
            let _ = pending.reply.send(Ok(ConsumerRecords::new(records)));
            debug!(
                record_count,
                "completed consumer poll with buffered records"
            );
            return;
        }

        if Instant::now() >= pending.deadline
            || (!self.has_group_subscription() && self.desired_topics().is_empty())
        {
            let _ = pending.reply.send(Ok(ConsumerRecords::default()));
            trace!("completed consumer poll with no records");
            return;
        }

        self.poll_state.pending_poll = Some(pending);
    }

    pub(crate) fn fail_pending_poll(&mut self, error: Error) {
        if let Some(pending) = self.poll_state.pending_poll.take() {
            let _ = pending.reply.send(Err(error));
        }
    }
}