kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

impl ConsumerRuntime {
    pub(crate) async fn drain_commit_request(&mut self) -> AnyResult<()> {
        let Some(pending) = self.lifecycle.pending_commits.pop_front() else {
            return Ok(());
        };

        if pending
            .cancellation
            .as_ref()
            .is_some_and(CancellationToken::is_cancelled)
        {
            if let Some(reply) = pending.reply {
                let _ = reply.send(Err(Error::Cancelled));
            }
            return Ok(());
        }

        let result = self.commit_offsets_inner(pending.offsets).await;
        match result {
            Ok(()) => {
                if pending.kind == CommitKind::Auto {
                    self.lifecycle.last_auto_commit = Instant::now();
                }
                if let Some(reply) = pending.reply {
                    let _ = reply.send(Ok(()));
                }
                Ok(())
            }
            Err(error) => {
                let client_error = into_client_error(error);
                let error_text = format!("{client_error:#}");
                if let Some(reply) = pending.reply {
                    let _ = reply.send(Err(client_error));
                }
                Err(anyhow!(error_text))
            }
        }
    }

    pub(crate) async fn commit_offsets_inner(
        &mut self,
        offsets: Vec<CommitOffset>,
    ) -> AnyResult<()> {
        if offsets.is_empty() {
            return Ok(());
        }

        debug!(offset_count = offsets.len(), "committing consumer offsets");
        self.ensure_coordinator_connection().await?;
        let client_id = self.config.client_id.clone();
        let version = self
            .connections
            .coordinator_connection
            .as_ref()
            .context("coordinator connection is missing")?
            .version_with_cap::<OffsetCommitRequest>(OFFSET_COMMIT_VERSION_CAP)?;
        let (member_id, member_epoch) = self.current_group_member();
        let commit_member_id = member_id.as_ref().map(ToString::to_string);

        let request = build_offset_commit_request(
            &self.config.group_id,
            commit_member_id.as_deref(),
            member_epoch,
            if member_epoch > 0 {
                self.config.instance_id.as_deref()
            } else {
                None
            },
            &offsets,
            &self.assignment_state.assignment,
        )?;

        let response: OffsetCommitResponse = self
            .connections
            .coordinator_connection
            .as_mut()
            .context("coordinator connection is missing")?
            .send_request::<OffsetCommitRequest>(&client_id, version, &request)
            .await?;

        for topic in response.topics {
            let topic_name = topic.name.0.to_string();
            for partition in topic.partitions {
                if let Some(error) = partition.error_code.err() {
                    if error.is_retriable() {
                        self.backoff_coordinator();
                        return Err(error.into());
                    }
                    bail!(
                        "offset commit failed for {}:{}: {}",
                        topic_name,
                        partition.partition_index,
                        error
                    );
                }
            }
        }

        for offset in offsets {
            self.poll_state.delivered_offsets.insert(
                TopicPartitionKey::new(offset.topic, offset.partition),
                offset.offset,
            );
        }

        debug!("consumer offset commit completed");
        Ok(())
    }

    pub(crate) fn fail_pending_commits(&mut self, message: &str) {
        while let Some(pending) = self.lifecycle.pending_commits.pop_front() {
            if let Some(reply) = pending.reply {
                let _ = reply.send(Err(Error::Internal(anyhow!(message.to_owned()))));
            }
        }
    }
}