kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

impl ConsumerRuntime {
    pub(crate) async fn drain_commit_request(&mut self) -> AnyResult<()> {
        let Some(pending) = self.lifecycle.pending_commits.pop_front() else {
            return Ok(());
        };

        if pending
            .cancellation
            .as_ref()
            .is_some_and(CancellationToken::is_cancelled)
        {
            if let Some(reply) = pending.reply {
                let _ = reply.send(Err(Error::Cancelled));
            }
            return Ok(());
        }

        let started_at = Instant::now();
        let kind = pending.kind;
        let offset_count = pending.offsets.len();
        let result = self.commit_offsets_inner(pending.offsets).await;
        telemetry::record_consumer_commit_completed(
            &self.config.client_id,
            &self.config.group_id,
            commit_kind_label(kind),
            offset_count,
            started_at.elapsed(),
            result.is_ok(),
        );
        match result {
            Ok(()) => {
                if kind == CommitKind::Auto {
                    self.lifecycle.last_auto_commit = Instant::now();
                }
                if let Some(reply) = pending.reply {
                    let _ = reply.send(Ok(()));
                }
                Ok(())
            }
            Err(error) => {
                let client_error = into_client_error(error);
                let error_text = format!("{client_error:#}");
                if let Some(reply) = pending.reply {
                    let _ = reply.send(Err(client_error));
                }
                Err(anyhow!(error_text))
            }
        }
    }

    pub(crate) async fn commit_offsets_inner(
        &mut self,
        offsets: Vec<CommitOffset>,
    ) -> AnyResult<()> {
        if offsets.is_empty() {
            return Ok(());
        }

        debug!(offset_count = offsets.len(), "committing consumer offsets");
        self.ensure_coordinator_connection().await?;
        let client_id = self.config.client_id.clone();
        let version = self
            .connections
            .coordinator_connection
            .as_ref()
            .context("coordinator connection is missing")?
            .version_with_cap::<OffsetCommitRequest>(OFFSET_COMMIT_VERSION_CAP)?;
        let (member_id, member_epoch) = self.current_group_member();
        let commit_member_id = member_id.as_ref().map(ToString::to_string);

        let request = build_offset_commit_request(
            &self.config.group_id,
            commit_member_id.as_deref(),
            member_epoch,
            if member_epoch > 0 {
                self.config.instance_id.as_deref()
            } else {
                None
            },
            &offsets,
            &self.assignment_state.assignment,
        )?;

        let response: OffsetCommitResponse = match self
            .connections
            .coordinator_connection
            .as_mut()
            .context("coordinator connection is missing")?
            .send_request::<OffsetCommitRequest>(&client_id, version, &request)
            .await
        {
            Ok(response) => response,
            Err(error) => {
                self.backoff_coordinator();
                return Err(error);
            }
        };

        for topic in response.topics {
            let topic_name = topic.name.0.to_string();
            for partition in topic.partitions {
                if let Some(error) = partition.error_code.err() {
                    if error.is_retriable() {
                        self.backoff_coordinator();
                        return Err(error.into());
                    }
                    return Err(broker_response_error(
                        "offset_commit",
                        Some(format!("{}:{}", topic_name, partition.partition_index)),
                        error,
                    )
                    .into());
                }
            }
        }

        for offset in offsets {
            self.poll_state.delivered_offsets.insert(
                TopicPartitionKey::new(offset.topic, offset.partition),
                offset.offset,
            );
        }

        debug!("consumer offset commit completed");
        Ok(())
    }

    pub(crate) fn fail_pending_commits(&mut self, message: &str) {
        while let Some(pending) = self.lifecycle.pending_commits.pop_front() {
            if let Some(reply) = pending.reply {
                let _ = reply.send(Err(Error::Consumer(ConsumerError::Fatal {
                    message: message.to_owned(),
                })));
            }
        }
    }
}

fn commit_kind_label(kind: CommitKind) -> &'static str {
    match kind {
        CommitKind::Manual => "manual",
        CommitKind::Auto => "auto",
    }
}