use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use anyhow::{Context, Result as AnyResult, anyhow, bail};
use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
use kafka_protocol::messages::find_coordinator_response::FindCoordinatorResponse;
use kafka_protocol::messages::init_producer_id_response::InitProducerIdResponse;
use kafka_protocol::messages::produce_response::ProduceResponse;
use kafka_protocol::messages::txn_offset_commit_response::TxnOffsetCommitResponse;
use kafka_protocol::messages::{
EndTxnRequest, FindCoordinatorRequest, InitProducerIdRequest, ProduceRequest, ProducerId,
TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::records::{NO_PRODUCER_EPOCH, NO_PRODUCER_ID};
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, timeout};
use tracing::{Instrument, debug, info_span, trace, warn};
mod event;
mod metadata;
use super::ProducerRuntimeEvent;
use super::accumulator::{
BatchProducerState, ProducerBatch, RecordAccumulator, estimate_record_size, fail_batch,
fail_batch_with_error,
};
use super::partitioner::DefaultPartitionerState;
use super::request::{acks_for_request, build_produce_request};
use super::transaction::{
ProducerIdentity, TransactionCoordinator, TransactionCoordinatorConnection,
TransactionFailureDisposition, TransactionManager, TransactionResult,
build_find_coordinator_request, build_group_find_coordinator_request,
build_txn_offset_commit_request, classify_transactional_error, ensure_transaction_v2_feature,
find_coordinator_error, parse_find_coordinator_response, validate_group_metadata,
};
use crate::config::ProducerConfig;
use crate::constants::{
END_TXN_TRANSACTION_V2_MIN_VERSION, END_TXN_VERSION_CAP, FIND_COORDINATOR_VERSION_CAP,
INIT_PRODUCER_ID_MIN_VERSION, INIT_PRODUCER_ID_VERSION_CAP, PRODUCE_TRANSACTION_V2_MIN_VERSION,
PRODUCE_VERSION_CAP, TXN_OFFSET_COMMIT_TRANSACTION_V2_MIN_VERSION,
TXN_OFFSET_COMMIT_VERSION_CAP,
};
use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
use crate::telemetry::{
self, ProducerAccumulatorMetrics, ProducerBatchMetrics, ProducerRequestMetrics,
};
use crate::types::{
CommitOffset, ConsumerGroupMetadata, ProduceAck, TopicPartitionInfo, TopicPartitionKey,
};
use crate::{BrokerError, Error, ProducerError, ProtocolError, Result as ClientResult};
use event::producer_event_name;
use metadata::ProducerMetadata;
pub struct Sender {
config: ProducerConfig,
metadata: ProducerMetadata,
connections: HashMap<i32, Vec<BrokerConnection>>,
accumulator: RecordAccumulator,
application_events: VecDeque<ProducerRuntimeEvent>,
pending_flushes: Vec<oneshot::Sender<ClientResult<()>>>,
shutting_down: bool,
close_reply: Option<oneshot::Sender<ClientResult<()>>>,
transaction: Option<TransactionManager>,
idempotent_producer: Option<ProducerIdentity>,
idempotent_sequences: HashMap<TopicPartitionKey, i32>,
transaction_coordinator_connection: Option<TransactionCoordinatorConnection>,
partitioner: DefaultPartitionerState,
produce_outcome_tx: mpsc::UnboundedSender<LeaderProduceOutcome>,
produce_outcome_rx: mpsc::UnboundedReceiver<LeaderProduceOutcome>,
in_flight_produce_count: usize,
in_flight_produce_by_leader: HashMap<i32, usize>,
}
struct LeaderProduceRequest {
leader_id: i32,
request: ProduceRequest,
transactional: bool,
expects_response: bool,
batch_count: usize,
record_count: usize,
batch_bytes: usize,
oldest_batch_age_ms: u128,
}
struct LeaderProduceOutcome {
leader_id: i32,
batches: Vec<ProducerBatch>,
result: AnyResult<(BrokerConnection, Option<ProduceResponse>)>,
}
enum BufferCapacity {
Ready,
Wait,
Reject(ProducerError),
}
async fn send_leader_produce_request(
mut connection: BrokerConnection,
client_id: String,
send: LeaderProduceRequest,
batches: Vec<ProducerBatch>,
) -> LeaderProduceOutcome {
let started = Instant::now();
let request_span = info_span!(
"producer.produce_request",
leader_id = send.leader_id,
batch_count = send.batch_count,
record_count = send.record_count,
batch_bytes = send.batch_bytes,
oldest_batch_age_ms = send.oldest_batch_age_ms,
transactional = send.transactional
);
let leader_id = send.leader_id;
let request_transactional = send.transactional;
let request_expects_response = send.expects_response;
let request_batch_count = send.batch_count;
let request_record_count = send.record_count;
let request_batch_bytes = send.batch_bytes;
let request_oldest_batch_age_ms = send.oldest_batch_age_ms;
let request_metrics = ProducerRequestMetrics {
client_id: &client_id,
leader_id,
transactional: request_transactional,
expects_response: request_expects_response,
batch_count: request_batch_count,
record_count: request_record_count,
batch_bytes: request_batch_bytes,
oldest_batch_age_ms: request_oldest_batch_age_ms,
};
telemetry::record_producer_request_dispatched(&request_metrics);
let completed_client_id = client_id.clone();
let result = async move {
let version = connection.version_with_cap::<ProduceRequest>(PRODUCE_VERSION_CAP)?;
if send.transactional && version < PRODUCE_TRANSACTION_V2_MIN_VERSION {
bail!(
"broker only supports Produce v{}, but transaction v2 requires Produce v{}+",
version,
PRODUCE_TRANSACTION_V2_MIN_VERSION
);
}
let response = if send.expects_response {
Some(
connection
.send_request::<ProduceRequest>(&client_id, version, &send.request)
.await?,
)
} else {
connection
.send_request_without_response::<ProduceRequest>(&client_id, version, &send.request)
.await?;
None
};
Ok((connection, response))
}
.instrument(request_span)
.await;
let completion_metrics = ProducerRequestMetrics {
client_id: &completed_client_id,
leader_id,
transactional: request_transactional,
expects_response: request_expects_response,
batch_count: request_batch_count,
record_count: request_record_count,
batch_bytes: request_batch_bytes,
oldest_batch_age_ms: request_oldest_batch_age_ms,
};
telemetry::record_producer_request_completed(
&completion_metrics,
started.elapsed(),
result.is_ok(),
);
LeaderProduceOutcome {
leader_id,
batches,
result,
}
}
impl Sender {
pub fn new(config: ProducerConfig) -> Self {
let transaction = config.transactional_id.clone().map(|transactional_id| {
TransactionManager::new(transactional_id, config.transaction_timeout)
});
let (produce_outcome_tx, produce_outcome_rx) = mpsc::unbounded_channel();
Self {
config,
metadata: ProducerMetadata::default(),
connections: HashMap::new(),
accumulator: RecordAccumulator::default(),
application_events: VecDeque::new(),
pending_flushes: Vec::new(),
shutting_down: false,
close_reply: None,
transaction,
idempotent_producer: None,
idempotent_sequences: HashMap::new(),
transaction_coordinator_connection: None,
partitioner: DefaultPartitionerState::default(),
produce_outcome_tx,
produce_outcome_rx,
in_flight_produce_count: 0,
in_flight_produce_by_leader: HashMap::new(),
}
}
pub async fn run(mut self, mut rx: mpsc::Receiver<ProducerRuntimeEvent>) {
let client_id = self.config.client_id.clone();
let transactional = self.transaction.is_some();
async move {
let mut running = true;
while running {
let wake = self.next_wakeup();
tokio::select! {
biased;
maybe_event = rx.recv() => {
match maybe_event {
Some(event) => self.application_events.push_back(event),
None => {
debug!("producer application event channel closed");
self.shutting_down = true;
}
}
}
maybe_outcome = self.produce_outcome_rx.recv() => {
if let Some(outcome) = maybe_outcome {
self.handle_produce_outcome(outcome);
}
}
_ = sleep(wake) => {}
}
running = self.run_once(&mut rx).await;
}
self.accumulator.fail_all("producer sender thread stopped");
if let Some(transaction) = self.transaction.as_mut() {
transaction.fail_pending("producer sender thread stopped");
}
self.fail_pending_flushes("producer sender thread stopped");
if let Some(reply) = self.close_reply.take() {
let _ = reply.send(Err(Error::Producer(ProducerError::RuntimeStoppedDuring {
operation: "shutdown",
})));
}
}
.instrument(tracing::debug_span!(
"producer_sender_thread",
%client_id,
transactional
))
.await;
}
async fn run_once(&mut self, rx: &mut mpsc::Receiver<ProducerRuntimeEvent>) -> bool {
self.drain_produce_outcomes();
self.drain_application_events(rx);
if self.pending_flushes.is_empty() {
self.process_application_events().await;
}
self.fail_expired_batches();
self.maybe_refresh_tracked_topics().await;
if let Err(error) = self.ensure_idempotent_producer_initialized().await {
let message = format!("{error:#}");
self.accumulator.fail_all(&message);
}
self.send_ready_batches().await;
self.drain_produce_outcomes();
self.maybe_complete_flushes();
self.maybe_complete_transaction().await;
self.finish_shutdown_if_ready()
}
fn drain_produce_outcomes(&mut self) {
while let Ok(outcome) = self.produce_outcome_rx.try_recv() {
self.handle_produce_outcome(outcome);
}
}
fn drain_application_events(&mut self, rx: &mut mpsc::Receiver<ProducerRuntimeEvent>) {
while let Ok(event) = rx.try_recv() {
self.application_events.push_back(event);
}
}
async fn process_application_events(&mut self) {
while let Some(event) = self.application_events.pop_front() {
if event.is_cancelled() {
event.send_cancelled();
continue;
}
trace!(
event = producer_event_name(&event),
pending_events = self.application_events.len(),
"processing producer application event"
);
match event {
ProducerRuntimeEvent::WarmUp { reply, .. } => {
let result = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await
.map(|_| ())
.map_err(Error::from);
let _ = reply.send(result);
}
ProducerRuntimeEvent::BeginTransaction { reply, .. } => {
let _ = reply.send(self.begin_transaction().await);
}
ProducerRuntimeEvent::InitTransactions { reply, .. } => {
let _ = reply.send(self.ensure_transaction_initialized().await);
}
ProducerRuntimeEvent::AppendRecord {
record,
enqueued_at,
cancellation,
reply,
} => {
if let Some(transaction) = self.transaction.as_ref()
&& let Err(error) = transaction.ensure_append_allowed()
{
let _ = reply.send(Err(error.into()));
continue;
}
match self.check_buffer_capacity(&record, enqueued_at) {
BufferCapacity::Ready => {}
BufferCapacity::Wait => {
self.application_events.push_front(
ProducerRuntimeEvent::AppendRecord {
record,
enqueued_at,
cancellation,
reply,
},
);
break;
}
BufferCapacity::Reject(error) => {
let _ = reply.send(Err(error.into()));
continue;
}
}
if let Err(error) = self.append_record(record, reply).await {
warn!(error = %error, "failed to append produce record");
}
}
ProducerRuntimeEvent::PartitionsFor { topic, reply, .. } => {
let _ = reply.send(self.partitions_for(topic).await);
}
ProducerRuntimeEvent::Flush { reply, .. } => {
if self.accumulator.has_undrained() {
self.pending_flushes.push(reply);
break;
}
let _ = reply.send(Ok(()));
}
ProducerRuntimeEvent::CommitTransaction { reply, .. } => {
let mut reply = Some(reply);
if let Err(error) =
self.request_transaction_completion(TransactionResult::Commit, &mut reply)
&& let Some(reply) = reply
{
let _ = reply.send(Err(error));
}
}
ProducerRuntimeEvent::AbortTransaction { reply, .. } => {
let mut reply = Some(reply);
if let Err(error) =
self.request_transaction_completion(TransactionResult::Abort, &mut reply)
&& let Some(reply) = reply
{
let _ = reply.send(Err(error));
}
}
ProducerRuntimeEvent::SendOffsetsToTransaction {
offsets,
group_metadata,
cancellation,
reply,
} => {
if self.accumulator.has_undrained() {
self.application_events.push_front(
ProducerRuntimeEvent::SendOffsetsToTransaction {
offsets,
group_metadata,
cancellation,
reply,
},
);
break;
}
let _ = reply.send(
self.send_offsets_to_transaction(offsets, group_metadata)
.await,
);
}
ProducerRuntimeEvent::Shutdown { reply } => {
self.shutting_down = true;
self.close_reply = Some(reply);
if let Some(transaction) = self.transaction.as_mut() {
transaction.ensure_shutdown_abort();
}
}
}
}
}
async fn begin_transaction(&mut self) -> ClientResult<()> {
if self.transaction.is_none() {
return Err(ProducerError::NotTransactional.into());
}
self.ensure_transaction_initialized().await?;
debug!("beginning producer transaction");
self.transaction
.as_mut()
.ok_or(ProducerError::TransactionManagerUnavailable {
operation: "begin_transaction",
})?
.begin()?;
Ok(())
}
fn request_transaction_completion(
&mut self,
result: TransactionResult,
reply: &mut Option<oneshot::Sender<ClientResult<()>>>,
) -> ClientResult<()> {
let transaction = self
.transaction
.as_mut()
.ok_or(ProducerError::NotTransactional)?;
transaction.request_completion(result, None)?;
if let Some(pending) = transaction.pending_completion.as_mut() {
pending.reply = reply.take();
}
Ok(())
}
fn check_buffer_capacity(
&self,
record: &crate::types::ProduceRecord,
enqueued_at: Instant,
) -> BufferCapacity {
let estimated_size = estimate_record_size(record);
if estimated_size > self.config.max_request_size {
return BufferCapacity::Reject(ProducerError::RecordTooLarge {
size: estimated_size,
limit_name: "max_request_size",
limit: self.config.max_request_size,
});
}
if estimated_size > self.config.buffer_memory {
return BufferCapacity::Reject(ProducerError::RecordTooLarge {
size: estimated_size,
limit_name: "buffer_memory",
limit: self.config.buffer_memory,
});
}
let buffered = self.accumulator.estimated_bytes();
if buffered.saturating_add(estimated_size) <= self.config.buffer_memory {
return BufferCapacity::Ready;
}
if enqueued_at.elapsed() >= self.config.max_block {
return BufferCapacity::Reject(ProducerError::BufferExhausted {
buffered,
required: estimated_size,
limit: self.config.buffer_memory,
max_block_ms: self.config.max_block.as_millis(),
});
}
BufferCapacity::Wait
}
async fn append_record(
&mut self,
mut record: crate::types::ProduceRecord,
reply: oneshot::Sender<ClientResult<ProduceAck>>,
) -> ClientResult<()> {
self.metadata.track(record.topic.clone());
let partition = match self.resolve_partition_for_record(&record).await {
Ok(partition) => partition,
Err(error) => {
let _ = reply.send(Err(error));
return Ok(());
}
};
record.partition = Some(partition);
let topic = record.topic.clone();
let opened_batch = self
.accumulator
.append(record, reply, self.config.batch_size);
telemetry::record_producer_record_queued(&self.config.client_id, &topic, partition);
if opened_batch {
telemetry::record_producer_batch_opened(&self.config.client_id, &topic, partition);
}
let stats = self.accumulator.stats();
telemetry::record_producer_accumulator(&ProducerAccumulatorMetrics {
client_id: &self.config.client_id,
batch_count: stats.batch_count,
record_count: stats.record_count,
estimated_bytes: stats.estimated_bytes,
retrying_batch_count: stats.retrying_batch_count,
oldest_batch_age_ms: stats.oldest_batch_age_ms,
});
debug!(
opened_batch,
pending_batch_count = stats.batch_count,
pending_record_count = stats.record_count,
pending_batch_bytes = stats.estimated_bytes,
retrying_batch_count = stats.retrying_batch_count,
oldest_pending_batch_age_ms = stats.oldest_batch_age_ms,
"queued producer record"
);
Ok(())
}
async fn resolve_partition_for_record(
&mut self,
record: &crate::types::ProduceRecord,
) -> ClientResult<i32> {
if let Some(partition) = record.partition {
return Ok(partition);
}
let partitions = self.partitions_for_topic(&record.topic).await?;
if partitions.is_empty() {
return Err(ProducerError::TopicHasNoPartitions {
topic: record.topic.clone(),
}
.into());
}
Ok(self.partitioner.partition(
self.config.partitioner,
record,
&partitions,
&self.accumulator,
self.config.batch_size,
self.config.partitioner_ignore_keys,
))
}
async fn partitions_for_topic(
&mut self,
topic: &str,
) -> ClientResult<Vec<(i32, crate::metadata::PartitionMetadata)>> {
if self.metadata.cache().partitions_for(topic).is_none()
|| self
.metadata
.cache()
.needs_refresh(topic, self.config.metadata_max_age)
{
self.metadata
.refresh_topics(&self.config, vec![topic.to_owned()])
.await?;
self.retain_connections_for_metadata();
}
self.metadata.cache().partitions_for(topic).ok_or_else(|| {
Error::Producer(ProducerError::MissingPartitionMetadata {
topic: topic.to_owned(),
})
})
}
async fn partitions_for(&mut self, topic: String) -> ClientResult<Vec<TopicPartitionInfo>> {
let topic = topic.trim().to_owned();
let partitions = timeout(self.config.max_block, self.partitions_for_topic(&topic))
.await
.map_err(|_| ProducerError::MetadataTimeout {
topic: topic.clone(),
max_block_ms: self.config.max_block.as_millis(),
})??;
Ok(partitions
.into_iter()
.map(|(partition, metadata)| TopicPartitionInfo {
partition,
leader_id: metadata.leader_id,
leader_epoch: metadata.leader_epoch,
replica_nodes: metadata.replica_nodes,
isr_nodes: metadata.isr_nodes,
offline_replicas: metadata.offline_replicas,
})
.collect())
}
async fn ensure_transaction_initialized(&mut self) -> ClientResult<()> {
if self
.transaction
.as_ref()
.and_then(|transaction| transaction.producer)
.is_some()
{
return Ok(());
}
let transactional_id = self.transactional_id()?.to_owned();
let transaction_timeout = self
.transaction
.as_ref()
.map(|transaction| transaction.transaction_timeout)
.ok_or(ProducerError::TransactionManagerUnavailable {
operation: "init_transactions",
})?;
let coordinator = self.find_transaction_coordinator(&transactional_id).await?;
let producer = self
.init_transactional_producer(&transactional_id, transaction_timeout)
.await?;
let transaction =
self.transaction
.as_mut()
.ok_or(ProducerError::TransactionManagerUnavailable {
operation: "init_transactions",
})?;
transaction.coordinator = Some(coordinator);
transaction.reset_with_new_producer(producer);
debug!(%transactional_id, producer_id = producer.id, producer_epoch = producer.epoch, "initialized transactional producer");
Ok(())
}
async fn ensure_idempotent_producer_initialized(&mut self) -> ClientResult<()> {
if self.transaction.is_some() || !self.config.is_idempotent() {
return Ok(());
}
if self.idempotent_producer.is_some() {
return Ok(());
}
let producer = self.init_idempotent_producer().await?;
self.idempotent_producer = Some(producer);
self.idempotent_sequences.clear();
debug!(
producer_id = producer.id,
producer_epoch = producer.epoch,
"initialized idempotent producer"
);
Ok(())
}
async fn send_offsets_to_transaction(
&mut self,
offsets: Vec<CommitOffset>,
group_metadata: ConsumerGroupMetadata,
) -> ClientResult<()> {
validate_group_metadata(&group_metadata)?;
let transaction = self
.transaction
.as_ref()
.ok_or(ProducerError::NotTransactional)?;
transaction.ensure_send_offsets_allowed()?;
self.ensure_transaction_initialized().await?;
let producer = self
.transaction
.as_ref()
.and_then(|transaction| transaction.producer)
.ok_or(ProducerError::TransactionalProducerNotInitialized {
operation: "send_offsets_to_transaction",
})?;
self.commit_transactional_offsets(&offsets, &group_metadata, producer)
.await?;
Ok(())
}
async fn commit_transactional_offsets(
&mut self,
offsets: &[CommitOffset],
group_metadata: &ConsumerGroupMetadata,
producer: ProducerIdentity,
) -> ClientResult<()> {
let transactional_id = self.transactional_id()?.to_owned();
let attempts = self.config.max_retries.max(10);
for attempt in 0..=attempts {
let coordinator = self
.find_group_coordinator(&group_metadata.group_id)
.await?;
let mut connection = BrokerConnection::connect_with_transport(
&coordinator.address,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
ensure_transaction_v2_feature(&connection)?;
let version = connection
.version_with_cap::<TxnOffsetCommitRequest>(TXN_OFFSET_COMMIT_VERSION_CAP)?;
if version < TXN_OFFSET_COMMIT_TRANSACTION_V2_MIN_VERSION {
return Err(ProducerError::UnsupportedApiVersion {
api: "TxnOffsetCommit",
min_version: TXN_OFFSET_COMMIT_TRANSACTION_V2_MIN_VERSION,
broker_version: version,
}
.into());
}
let request = build_txn_offset_commit_request(
&transactional_id,
producer,
offsets,
group_metadata,
);
let response: TxnOffsetCommitResponse = match connection
.send_request::<TxnOffsetCommitRequest>(&self.config.client_id, version, &request)
.await
{
Ok(response) => response,
Err(error) => {
let message = format!("{error:#}");
self.mark_transaction_abort_only(message.clone());
return Err(ProducerError::TransactionAbortRequired {
operation: "send_offsets_to_transaction",
message,
}
.into());
}
};
let response_error = response.topics.iter().find_map(|topic| {
topic
.partitions
.iter()
.find_map(|partition| partition.error_code.err())
});
let Some(error) = response_error else {
return Ok(());
};
if attempt < attempts
&& matches!(
error,
ResponseError::CoordinatorLoadInProgress
| ResponseError::CoordinatorNotAvailable
| ResponseError::NotCoordinator
)
{
sleep(self.config.retry_backoff).await;
continue;
}
let mut broker_error = BrokerError::response(
"send_offsets_to_transaction",
Some(group_metadata.group_id.clone()),
error,
);
let message = broker_error.to_string();
match classify_transactional_error(error) {
TransactionFailureDisposition::AbortOnly => {
broker_error = broker_error.transaction_abort_required();
self.mark_transaction_abort_only(message.clone());
}
TransactionFailureDisposition::Fatal => {
broker_error = broker_error.fatal();
self.mark_transaction_fatal(message.clone())
}
}
return Err(Error::Broker(broker_error));
}
Err(ProducerError::AttemptsExhausted {
operation: "send_offsets_to_transaction",
resource: format!(" for group '{}'", group_metadata.group_id),
attempts: attempts + 1,
}
.into())
}
async fn maybe_refresh_tracked_topics(&mut self) {
if !self
.metadata
.needs_periodic_refresh(self.config.metadata_max_age)
{
return;
}
if let Err(error) = self.metadata.refresh_tracked(&self.config).await {
warn!(error = %error, "producer metadata refresh failed");
return;
}
self.retain_connections_for_metadata();
}
async fn send_ready_batches(&mut self) {
loop {
let ready =
self.accumulator
.ready(self.metadata.cache(), &self.config, self.shutting_down);
if !ready.unknown_leaders.is_empty() {
self.resolve_unknown_leaders(ready.unknown_leaders).await;
continue;
}
let mut ready_by_leader = ready.ready_by_leader;
ready_by_leader.retain(|leader_id, _| self.can_send_more_to_leader(*leader_id));
let drained = self.accumulator.drain_ready(ready_by_leader);
if drained.is_empty() {
break;
}
debug!(
leader_count = drained.len(),
batch_count = drained.values().map(Vec::len).sum::<usize>(),
record_count = drained
.values()
.flat_map(|batches| batches.iter())
.map(|batch| batch.records.len())
.sum::<usize>(),
batch_bytes = drained
.values()
.flat_map(|batches| batches.iter())
.map(|batch| batch.estimated_bytes)
.sum::<usize>(),
"sending ready producer batches"
);
for (leader_id, mut batches) in drained {
if let Some(error) = self.request_size_error(&batches) {
self.fail_batches_with_error(batches, error.into());
continue;
}
if let Err(error) = self.prepare_batches_for_send(&mut batches) {
let message = format!("{error:#}");
self.fail_batches(batches, &message);
self.mark_transaction_abort_only(message);
continue;
}
match self.build_leader_produce_request(leader_id, &batches) {
Ok(send) => match self.take_connection_for(leader_id).await {
Ok(connection) => {
self.mark_produce_in_flight(leader_id);
let client_id = self.config.client_id.clone();
let outcome_tx = self.produce_outcome_tx.clone();
tokio::spawn(async move {
let outcome = send_leader_produce_request(
connection, client_id, send, batches,
)
.await;
let _ = outcome_tx.send(outcome);
});
}
Err(error) => {
self.handle_transport_failure(leader_id, batches, error);
}
},
Err(error) => {
self.fail_batches(batches, &format!("{error:#}"));
self.mark_transaction_abort_only(format!("{error:#}"));
}
}
}
}
}
fn can_send_more_to_leader(&self, leader_id: i32) -> bool {
self.in_flight_produce_by_leader
.get(&leader_id)
.copied()
.unwrap_or(0)
< self.config.max_in_flight_requests_per_connection
}
fn mark_produce_in_flight(&mut self, leader_id: i32) {
self.in_flight_produce_count = self.in_flight_produce_count.saturating_add(1);
*self
.in_flight_produce_by_leader
.entry(leader_id)
.or_insert(0) += 1;
}
fn mark_produce_complete(&mut self, leader_id: i32) {
self.in_flight_produce_count = self.in_flight_produce_count.saturating_sub(1);
if let Some(count) = self.in_flight_produce_by_leader.get_mut(&leader_id) {
*count = count.saturating_sub(1);
if *count == 0 {
self.in_flight_produce_by_leader.remove(&leader_id);
}
}
}
fn handle_produce_outcome(&mut self, outcome: LeaderProduceOutcome) {
self.mark_produce_complete(outcome.leader_id);
match outcome.result {
Ok((connection, response)) => {
self.connections
.entry(outcome.leader_id)
.or_default()
.push(connection);
if let Some(response) = response {
self.handle_produce_response(outcome.leader_id, outcome.batches, response);
} else {
self.handle_produce_without_response(outcome.batches);
}
}
Err(error) => {
self.handle_transport_failure(outcome.leader_id, outcome.batches, error);
}
}
}
fn prepare_batches_for_send(&mut self, batches: &mut [ProducerBatch]) -> AnyResult<()> {
for batch in batches {
if batch.producer_state.is_some() {
continue;
}
batch.producer_state =
self.producer_state_for_batch(&batch.key, batch.records.len())?;
}
Ok(())
}
fn producer_state_for_batch(
&mut self,
key: &TopicPartitionKey,
record_count: usize,
) -> AnyResult<Option<BatchProducerState>> {
if let Some(transaction) = self.transaction.as_mut() {
let producer = transaction
.producer
.context("transactional producer was not initialized")?;
let next_sequence = transaction.sequences.entry(key.clone()).or_insert(0);
let batch_state = BatchProducerState {
producer_id: producer.id,
producer_epoch: producer.epoch,
base_sequence: *next_sequence,
transactional: true,
};
*next_sequence = next_sequence.saturating_add(i32::try_from(record_count).unwrap_or(0));
return Ok(Some(batch_state));
}
if let Some(producer) = self.idempotent_producer {
let next_sequence = self.idempotent_sequences.entry(key.clone()).or_insert(0);
let batch_state = BatchProducerState {
producer_id: producer.id,
producer_epoch: producer.epoch,
base_sequence: *next_sequence,
transactional: false,
};
*next_sequence = next_sequence.saturating_add(i32::try_from(record_count).unwrap_or(0));
return Ok(Some(batch_state));
}
Ok(None)
}
async fn resolve_unknown_leaders(&mut self, partitions: Vec<TopicPartitionKey>) {
let topics: HashSet<String> = partitions.iter().map(|key| key.topic.clone()).collect();
debug!(
topic_count = topics.len(),
partition_count = partitions.len(),
"refreshing metadata for producer partitions without leaders"
);
if let Err(error) = self
.metadata
.refresh_topics(&self.config, topics.iter().cloned().collect())
.await
{
warn!(error = %error, "producer metadata refresh for unknown leaders failed");
} else {
self.retain_connections_for_metadata();
}
let unresolved = partitions
.into_iter()
.filter(|key| {
self.metadata
.leader_for(&key.topic, key.partition)
.is_none()
})
.collect();
for batch in self.accumulator.drain_unknown_leader(unresolved) {
let topic = batch.key.topic.clone();
let partition = batch.key.partition;
self.metadata.invalidate_topic(&batch.key.topic);
self.retry_or_fail_batch(
batch,
format!(
"no leader metadata for topic '{}' partition {}",
topic, partition
),
);
}
}
fn build_leader_produce_request(
&mut self,
leader_id: i32,
batches: &[ProducerBatch],
) -> AnyResult<LeaderProduceRequest> {
let batch_count = batches.len();
let record_count = batches
.iter()
.map(|batch| batch.records.len())
.sum::<usize>();
let batch_bytes = batches
.iter()
.map(|batch| batch.estimated_bytes)
.sum::<usize>();
let oldest_batch_age_ms = batches
.iter()
.map(|batch| batch.created_at.elapsed().as_millis())
.max()
.unwrap_or(0);
debug!(
leader_id,
batch_count,
record_count,
batch_bytes,
oldest_batch_age_ms,
"dispatching produce request to leader"
);
let transactional_id = self.transactional_id_if_active().map(str::to_owned);
let build_span = info_span!(
"producer.build_produce_request",
leader_id,
batch_count,
record_count,
batch_bytes,
transactional = transactional_id.is_some()
);
let acks = acks_for_request(self.config.acks, transactional_id.is_some());
let request = {
let _guard = build_span.enter();
build_produce_request(
batches,
acks,
self.config.request_timeout,
self.config.compression,
transactional_id.as_deref(),
)?
};
Ok(LeaderProduceRequest {
leader_id,
request,
transactional: transactional_id.is_some(),
expects_response: acks != 0,
batch_count,
record_count,
batch_bytes,
oldest_batch_age_ms,
})
}
fn request_size_error(&self, batches: &[ProducerBatch]) -> Option<ProducerError> {
let batch_bytes = batches
.iter()
.map(|batch| batch.estimated_bytes)
.sum::<usize>();
(batch_bytes > self.config.max_request_size).then_some(ProducerError::RequestTooLarge {
size: batch_bytes,
limit: self.config.max_request_size,
})
}
async fn take_connection_for(&mut self, broker_id: i32) -> AnyResult<BrokerConnection> {
if let Some(pool) = self.connections.get_mut(&broker_id)
&& let Some(connection) = pool.pop()
{
if pool.is_empty() {
self.connections.remove(&broker_id);
}
return Ok(connection);
}
let broker = self
.metadata
.broker(broker_id)
.cloned()
.with_context(|| format!("broker {broker_id} is missing from metadata"))?;
let address = broker.address();
let connection = BrokerConnection::connect_with_transport(
&address,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
trace!(broker_id, %address, "opened producer broker connection");
Ok(connection)
}
fn handle_produce_response(
&mut self,
leader_id: i32,
batches: Vec<ProducerBatch>,
response: ProduceResponse,
) {
let mut in_flight = batches
.into_iter()
.map(|batch| (batch.key.clone(), batch))
.collect::<HashMap<_, _>>();
for topic_response in response.responses {
let topic = topic_response.name.0.to_string();
for partition_response in topic_response.partition_responses {
let key = TopicPartitionKey::new(topic.clone(), partition_response.index);
let Some(batch) = in_flight.remove(&key) else {
continue;
};
if let Some(error) = partition_response.error_code.err() {
self.handle_kafka_failure(leader_id, batch, error);
continue;
}
self.complete_batch_success(
batch,
partition_response.base_offset,
partition_response.log_append_time_ms,
);
}
}
for batch in in_flight.into_values() {
let topic = batch.key.topic.clone();
let partition = batch.key.partition;
self.handle_transport_failure(
leader_id,
vec![batch],
anyhow!("missing produce response for {}:{}", topic, partition),
);
}
}
fn handle_produce_without_response(&self, batches: Vec<ProducerBatch>) {
for batch in batches {
self.complete_batch_success(batch, -1, -1);
}
}
fn complete_batch_success(
&self,
mut batch: ProducerBatch,
base_offset: i64,
log_append_time_ms: i64,
) {
telemetry::record_producer_batch_succeeded(&ProducerBatchMetrics {
client_id: &self.config.client_id,
topic: &batch.key.topic,
partition: batch.key.partition,
record_count: batch.records.len(),
duration: batch.created_at.elapsed(),
});
for (index, reply) in batch.replies.drain(..).enumerate() {
let offset = if base_offset == -1 {
-1
} else {
base_offset + i64::try_from(index).unwrap_or(0)
};
let _ = reply.send(Ok(ProduceAck {
topic: batch.key.topic.clone(),
partition: batch.key.partition,
base_offset,
offset,
log_append_time_ms,
}));
}
}
fn handle_kafka_failure(&mut self, leader_id: i32, batch: ProducerBatch, error: ResponseError) {
self.record_batch_failure(&batch, "broker", Some(error.to_string()));
warn!(
leader_id,
topic = %batch.key.topic,
partition = batch.key.partition,
record_count = batch.records.len(),
retriable = error.is_retriable(),
error = %error,
"producer batch failed with broker error"
);
if error == ResponseError::MessageTooLarge && batch.can_split() {
warn!(
leader_id,
topic = %batch.key.topic,
partition = batch.key.partition,
record_count = batch.records.len(),
"splitting oversized producer batch for retry"
);
self.accumulator.split_and_reenqueue_front(batch);
return;
}
if error == ResponseError::DuplicateSequenceNumber && batch.producer_state.is_some() {
self.complete_batch_success(batch, -1, -1);
return;
}
if Self::is_recoverable_idempotent_sequence_error(error)
&& self.is_non_transactional_idempotent_batch(&batch)
{
let mut batch = batch;
self.reset_idempotent_producer_for_partition(&batch.key);
batch.producer_state = None;
self.retry_or_fail_batch(batch, format!("produce rejected by broker: {error}"));
return;
}
if error.is_retriable() {
self.connections.remove(&leader_id);
self.metadata.invalidate_topic(&batch.key.topic);
self.retry_or_fail_batch(batch, format!("produce rejected by broker: {error}"));
return;
}
if error == ResponseError::ConcurrentTransactions
&& self.transaction.is_some()
&& batch.producer_state.is_some()
{
self.retry_or_fail_batch(batch, format!("produce rejected by broker: {error}"));
return;
}
let resource = format!("{}:{}", batch.key.topic, batch.key.partition);
let mut broker_error = BrokerError::response("produce", Some(resource), error);
let message = broker_error.to_string();
if self.transaction.is_some() && batch.producer_state.is_some() {
match classify_transactional_error(error) {
TransactionFailureDisposition::AbortOnly => {
broker_error = broker_error.transaction_abort_required();
self.mark_transaction_abort_only(message.clone())
}
TransactionFailureDisposition::Fatal => {
broker_error = broker_error.fatal();
self.mark_transaction_fatal(message.clone())
}
}
fail_batch_with_error(batch, &Error::Broker(broker_error));
self.accumulator.fail_all(&message);
return;
}
if self.is_non_transactional_idempotent_batch(&batch) {
self.reset_idempotent_producer_for_partition(&batch.key);
}
fail_batch_with_error(batch, &Error::Broker(broker_error));
}
fn is_recoverable_idempotent_sequence_error(error: ResponseError) -> bool {
matches!(
error,
ResponseError::UnknownProducerId | ResponseError::OutOfOrderSequenceNumber
)
}
fn is_non_transactional_idempotent_batch(&self, batch: &ProducerBatch) -> bool {
self.transaction.is_none()
&& batch
.producer_state
.is_some_and(|state| !state.transactional)
}
fn reset_idempotent_producer_for_partition(&mut self, key: &TopicPartitionKey) {
self.idempotent_producer = None;
self.idempotent_sequences.remove(key);
debug!(
topic = %key.topic,
partition = key.partition,
"reset idempotent producer state for partition"
);
}
fn handle_transport_failure(
&mut self,
leader_id: i32,
batches: Vec<ProducerBatch>,
error: anyhow::Error,
) {
warn!(
leader_id,
batch_count = batches.len(),
error = %error,
"producer transport failure while sending batches"
);
self.connections.remove(&leader_id);
let message = format!("{error:#}");
for batch in batches {
self.record_batch_failure(&batch, "transport", None);
self.metadata.invalidate_topic(&batch.key.topic);
self.retry_or_fail_batch(batch, message.clone());
}
}
fn retry_or_fail_batch(&mut self, mut batch: ProducerBatch, message: String) {
if batch.created_at.elapsed() >= self.config.delivery_timeout {
let timeout_message = format!(
"delivery timed out after {:?}: {}",
self.config.delivery_timeout, message
);
warn!(
topic = %batch.key.topic,
partition = batch.key.partition,
attempts = batch.attempts,
"producer batch exceeded delivery timeout"
);
self.fail_delivery_timeout(batch, timeout_message);
return;
}
if batch.attempts >= self.config.max_retries {
warn!(
topic = %batch.key.topic,
partition = batch.key.partition,
attempts = batch.attempts,
max_retries = self.config.max_retries,
"producer batch exhausted retries"
);
if self.transaction.is_some() && batch.producer_state.is_some() {
fail_batch(batch, &message);
self.accumulator.fail_all(&message);
self.mark_transaction_abort_only(message);
} else {
fail_batch(batch, &message);
}
return;
}
batch.attempts += 1;
batch.retry_at = Some(Instant::now() + self.config.retry_backoff);
debug!(
topic = %batch.key.topic,
partition = batch.key.partition,
attempts = batch.attempts,
retry_backoff_ms = self.config.retry_backoff.as_millis(),
"reenqueuing producer batch for retry"
);
self.accumulator.reenqueue_front(batch);
}
fn fail_expired_batches(&mut self) {
let expired = self
.accumulator
.expire_batches(self.config.delivery_timeout);
for batch in expired {
let message = format!(
"delivery timed out after {:?} for {}:{}",
self.config.delivery_timeout, batch.key.topic, batch.key.partition
);
self.fail_delivery_timeout(batch, message);
}
}
fn fail_delivery_timeout(&mut self, batch: ProducerBatch, message: String) {
self.record_batch_failure(&batch, "delivery_timeout", None);
if self.transaction.is_some() && batch.producer_state.is_some() {
fail_batch(batch, &message);
self.accumulator.fail_all(&message);
self.mark_transaction_abort_only(message);
return;
}
fail_batch(batch, &message);
}
fn fail_batches(&self, batches: Vec<ProducerBatch>, message: &str) {
for batch in batches {
self.record_batch_failure(&batch, "internal", None);
fail_batch(batch, message);
}
}
fn fail_batches_with_error(&self, batches: Vec<ProducerBatch>, error: Error) {
for batch in batches {
self.record_batch_failure(&batch, "internal", Some(error.to_string()));
fail_batch_with_error(batch, &error);
}
}
fn record_batch_failure(
&self,
batch: &ProducerBatch,
reason: &'static str,
broker_error: Option<String>,
) {
let error = broker_error.unwrap_or_else(|| reason.to_owned());
telemetry::record_producer_batch_failed(
&ProducerBatchMetrics {
client_id: &self.config.client_id,
topic: &batch.key.topic,
partition: batch.key.partition,
record_count: batch.records.len(),
duration: batch.created_at.elapsed(),
},
reason,
&error,
);
}
fn maybe_complete_flushes(&mut self) {
if self.pending_flushes.is_empty()
|| self.accumulator.has_undrained()
|| self.in_flight_produce_count > 0
{
return;
}
let completed = self.pending_flushes.len();
for reply in std::mem::take(&mut self.pending_flushes) {
let _ = reply.send(Ok(()));
}
debug!(completed, "completed producer flush waiters");
}
fn fail_pending_flushes(&mut self, _message: &str) {
for reply in std::mem::take(&mut self.pending_flushes) {
let _ = reply.send(Err(Error::Producer(ProducerError::RuntimeStoppedDuring {
operation: "flush",
})));
}
}
fn retain_connections_for_metadata(&mut self) {
self.connections
.retain(|broker_id, _| self.metadata.contains_broker(*broker_id));
}
async fn maybe_complete_transaction(&mut self) {
let Some(result) = self.transaction.as_ref().and_then(|transaction| {
transaction
.pending_completion
.as_ref()
.map(|pending| pending.result)
}) else {
return;
};
if self.accumulator.has_undrained() {
return;
}
debug!(
result = if matches!(result, TransactionResult::Commit) {
"commit"
} else {
"abort"
},
"completing producer transaction"
);
let reply = self
.transaction
.as_mut()
.and_then(|transaction| transaction.pending_completion.take())
.and_then(|pending| pending.reply);
let outcome = self.end_transaction(result).await;
match outcome {
Ok(()) => {
debug!("producer transaction completed");
if let Some(reply) = reply {
let _ = reply.send(Ok(()));
}
}
Err(error) => {
let message = format!("{error:#}");
if error.transaction_abort_required() {
self.mark_transaction_abort_only(message.clone());
} else {
self.mark_transaction_fatal(message.clone());
}
warn!(error = %message, "producer transaction completion failed");
if let Some(reply) = reply {
let _ = reply.send(Err(error));
}
}
}
}
async fn end_transaction(&mut self, result: TransactionResult) -> ClientResult<()> {
let transactional_id = self.transactional_id()?.to_owned();
let producer = self
.transaction
.as_ref()
.and_then(|transaction| transaction.producer)
.ok_or(ProducerError::TransactionalProducerNotInitialized {
operation: "end_transaction",
})?;
let version = {
let connection = self.transaction_coordinator_connection().await?;
let version = connection.version_with_cap::<EndTxnRequest>(END_TXN_VERSION_CAP)?;
if version < END_TXN_TRANSACTION_V2_MIN_VERSION {
return Err(ProtocolError::UnsupportedApiVersion {
api: "EndTxn",
min_version: END_TXN_TRANSACTION_V2_MIN_VERSION,
broker_version: version,
}
.into());
}
version
};
let request = EndTxnRequest::default()
.with_transactional_id(TransactionalId(StrBytes::from_string(transactional_id)))
.with_producer_id(ProducerId(producer.id))
.with_producer_epoch(producer.epoch)
.with_committed(matches!(result, TransactionResult::Commit));
let response = {
let client_id = self.config.client_id.clone();
let connection = self.transaction_coordinator_connection().await?;
match connection
.send_request::<EndTxnRequest>(&client_id, version, &request)
.await
{
Ok(response) => response,
Err(error) => {
self.invalidate_transaction_coordinator();
let operation = match result {
TransactionResult::Commit => "commit_transaction",
TransactionResult::Abort => "abort_transaction",
};
return Err(ProducerError::TransactionFatal {
operation,
message: format!("{error:#}"),
}
.into());
}
}
};
if let Some(error) = response.error_code.err() {
if matches!(
error,
ResponseError::CoordinatorNotAvailable | ResponseError::NotCoordinator
) {
self.invalidate_transaction_coordinator();
}
let mut broker_error = BrokerError::response("end_transaction", None::<String>, error);
match (result, classify_transactional_error(error)) {
(_, TransactionFailureDisposition::Fatal) | (TransactionResult::Abort, _) => {
broker_error = broker_error.fatal();
}
(TransactionResult::Commit, TransactionFailureDisposition::AbortOnly) => {
broker_error = broker_error.transaction_abort_required();
}
}
return Err(Error::Broker(broker_error));
}
let next_producer = ProducerIdentity {
id: *response.producer_id,
epoch: response.producer_epoch,
};
let transaction =
self.transaction
.as_mut()
.ok_or(ProducerError::TransactionManagerUnavailable {
operation: "end_transaction",
})?;
transaction.finish_success(next_producer);
Ok(())
}
async fn find_transaction_coordinator(
&mut self,
transactional_id: &str,
) -> ClientResult<TransactionCoordinator> {
let attempts = self.config.max_retries.max(10);
for attempt in 0..=attempts {
let mut bootstrap = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
let version = bootstrap
.version_with_cap::<FindCoordinatorRequest>(FIND_COORDINATOR_VERSION_CAP)?;
let request = build_find_coordinator_request(transactional_id, version);
let response: FindCoordinatorResponse = bootstrap
.send_request::<FindCoordinatorRequest>(&self.config.client_id, version, &request)
.await?;
if let Some(error) = find_coordinator_error(&response, version)
&& attempt < attempts
&& matches!(
error,
ResponseError::CoordinatorNotAvailable
| ResponseError::CoordinatorLoadInProgress
| ResponseError::NotCoordinator
)
{
sleep(self.config.retry_backoff).await;
continue;
}
let coordinator = parse_find_coordinator_response(response, version)?;
self.transaction_coordinator_connection = None;
if let Some(transaction) = self.transaction.as_mut() {
transaction.coordinator = Some(coordinator.clone());
}
debug!(
%transactional_id,
coordinator = %coordinator.address,
"resolved transaction coordinator"
);
return Ok(coordinator);
}
Err(ProducerError::AttemptsExhausted {
operation: "find_transaction_coordinator",
resource: format!(" for transactional_id '{transactional_id}'"),
attempts: attempts + 1,
}
.into())
}
async fn find_group_coordinator(
&mut self,
group_id: &str,
) -> ClientResult<TransactionCoordinator> {
let attempts = self.config.max_retries.max(10);
for attempt in 0..=attempts {
let mut bootstrap = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
let version = bootstrap
.version_with_cap::<FindCoordinatorRequest>(FIND_COORDINATOR_VERSION_CAP)?;
let request = build_group_find_coordinator_request(group_id, version);
let response: FindCoordinatorResponse = bootstrap
.send_request::<FindCoordinatorRequest>(&self.config.client_id, version, &request)
.await?;
if let Some(error) = find_coordinator_error(&response, version)
&& attempt < attempts
&& matches!(
error,
ResponseError::CoordinatorNotAvailable
| ResponseError::CoordinatorLoadInProgress
| ResponseError::NotCoordinator
)
{
sleep(self.config.retry_backoff).await;
continue;
}
let coordinator = parse_find_coordinator_response(response, version)?;
debug!(%group_id, coordinator = %coordinator.address, "resolved group coordinator");
return Ok(coordinator);
}
Err(ProducerError::AttemptsExhausted {
operation: "find_group_coordinator",
resource: format!(" for group '{group_id}'"),
attempts: attempts + 1,
}
.into())
}
async fn init_transactional_producer(
&mut self,
transactional_id: &str,
transaction_timeout: Duration,
) -> ClientResult<ProducerIdentity> {
let attempts = self.config.max_retries.max(30);
for attempt in 0..=attempts {
let version = {
let connection = self.transaction_coordinator_connection().await?;
ensure_transaction_v2_feature(connection)?;
let version = connection
.version_with_cap::<InitProducerIdRequest>(INIT_PRODUCER_ID_VERSION_CAP)?;
if version < INIT_PRODUCER_ID_MIN_VERSION {
return Err(ProducerError::UnsupportedApiVersion {
api: "InitProducerId",
min_version: INIT_PRODUCER_ID_MIN_VERSION,
broker_version: version,
}
.into());
}
version
};
let request = InitProducerIdRequest::default()
.with_transactional_id(Some(TransactionalId(StrBytes::from_string(
transactional_id.to_owned(),
))))
.with_transaction_timeout_ms(duration_to_i32_ms(transaction_timeout)?)
.with_producer_id(ProducerId(NO_PRODUCER_ID))
.with_producer_epoch(NO_PRODUCER_EPOCH);
let response: InitProducerIdResponse = {
let client_id = self.config.client_id.clone();
let connection = self.transaction_coordinator_connection().await?;
match connection
.send_request::<InitProducerIdRequest>(&client_id, version, &request)
.await
{
Ok(response) => response,
Err(error) if attempt < attempts => {
self.invalidate_transaction_coordinator();
warn!(
error = %format!("{error:#}"),
"retrying InitProducerId after coordinator transport failure"
);
sleep(self.config.retry_backoff).await;
continue;
}
Err(error) => {
let message = format!("{error:#}");
self.mark_transaction_fatal(message.clone());
return Err(ProducerError::TransactionFatal {
operation: "init_transactions",
message,
}
.into());
}
}
};
if let Some(error) = response.error_code.err() {
if attempt < attempts && should_retry_init_transactional_producer_error(error) {
if matches!(
error,
ResponseError::NotCoordinator
| ResponseError::CoordinatorNotAvailable
| ResponseError::CoordinatorLoadInProgress
) {
self.invalidate_transaction_coordinator();
}
sleep(self.config.retry_backoff).await;
continue;
}
let mut broker_error = BrokerError::response(
"init_transactional_producer",
Some(transactional_id.to_owned()),
error,
);
let message = broker_error.to_string();
match classify_transactional_error(error) {
TransactionFailureDisposition::AbortOnly => {
broker_error = broker_error.transaction_abort_required();
self.mark_transaction_abort_only(message);
}
TransactionFailureDisposition::Fatal => {
broker_error = broker_error.fatal();
self.mark_transaction_fatal(message);
}
}
return Err(Error::Broker(broker_error));
}
return Ok(ProducerIdentity {
id: *response.producer_id,
epoch: response.producer_epoch,
});
}
Err(ProducerError::AttemptsExhausted {
operation: "init_transactions",
resource: format!(" for transactional_id '{transactional_id}'"),
attempts: attempts + 1,
}
.into())
}
async fn init_idempotent_producer(&mut self) -> ClientResult<ProducerIdentity> {
let attempts = self.config.max_retries.max(10);
for attempt in 0..=attempts {
let mut bootstrap = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
let version = bootstrap
.version_with_cap::<InitProducerIdRequest>(INIT_PRODUCER_ID_VERSION_CAP)?;
if version < INIT_PRODUCER_ID_MIN_VERSION {
return Err(ProducerError::UnsupportedApiVersion {
api: "InitProducerId",
min_version: INIT_PRODUCER_ID_MIN_VERSION,
broker_version: version,
}
.into());
}
let request = InitProducerIdRequest::default()
.with_transactional_id(None)
.with_transaction_timeout_ms(duration_to_i32_ms(self.config.transaction_timeout)?)
.with_producer_id(ProducerId(NO_PRODUCER_ID))
.with_producer_epoch(NO_PRODUCER_EPOCH);
let response: InitProducerIdResponse = bootstrap
.send_request::<InitProducerIdRequest>(&self.config.client_id, version, &request)
.await?;
if let Some(error) = response.error_code.err() {
if attempt < attempts && error.is_retriable() {
sleep(self.config.retry_backoff).await;
continue;
}
return Err(Error::Broker(BrokerError::response(
"init_idempotent_producer",
None::<String>,
error,
)));
}
return Ok(ProducerIdentity {
id: *response.producer_id,
epoch: response.producer_epoch,
});
}
Err(ProducerError::AttemptsExhausted {
operation: "init_idempotent_producer",
resource: String::new(),
attempts: attempts + 1,
}
.into())
}
async fn transaction_coordinator_connection(&mut self) -> ClientResult<&mut BrokerConnection> {
if self.transaction.is_none() {
return Err(ProducerError::NotTransactional.into());
}
let transactional_id = self.transactional_id()?.to_owned();
let coordinator = match self
.transaction
.as_ref()
.and_then(|transaction| transaction.coordinator.clone())
{
Some(coordinator) => coordinator,
None => self.find_transaction_coordinator(&transactional_id).await?,
};
let reconnect = !matches!(
self.transaction_coordinator_connection.as_ref(),
Some(existing)
if existing.broker_id == coordinator.broker_id && existing.address == coordinator.address
);
if reconnect {
let connection = BrokerConnection::connect_with_transport(
&coordinator.address,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
self.transaction_coordinator_connection = Some(TransactionCoordinatorConnection {
broker_id: coordinator.broker_id,
address: coordinator.address.clone(),
connection,
});
trace!(
broker_id = coordinator.broker_id,
address = %coordinator.address,
"opened transaction coordinator connection"
);
}
self.transaction_coordinator_connection
.as_mut()
.map(|coordinator| &mut coordinator.connection)
.ok_or(ProducerError::TransactionCoordinatorConnectionMissing.into())
}
fn invalidate_transaction_coordinator(&mut self) {
self.transaction_coordinator_connection = None;
if let Some(transaction) = self.transaction.as_mut() {
transaction.coordinator = None;
}
debug!("invalidated transaction coordinator connection");
}
fn mark_transaction_abort_only(&mut self, message: String) {
if let Some(transaction) = self.transaction.as_mut() {
transaction.mark_abort_only(message);
}
warn!("producer transaction moved to abort-only state");
}
fn mark_transaction_fatal(&mut self, message: String) {
if let Some(transaction) = self.transaction.as_mut() {
transaction.mark_fatal(message);
}
warn!("producer transaction moved to fatal state");
}
fn transactional_id_if_active(&self) -> Option<&str> {
self.transaction
.as_ref()
.and_then(TransactionManager::transactional_id_if_active)
}
fn transactional_id(&self) -> ClientResult<&str> {
self.transaction
.as_ref()
.map(|transaction| transaction.transactional_id.as_str())
.ok_or(ProducerError::NotTransactional.into())
}
fn finish_shutdown_if_ready(&mut self) -> bool {
let has_pending_txn_completion = self
.transaction
.as_ref()
.and_then(|transaction| transaction.pending_completion.as_ref())
.is_some();
if !self.shutting_down
|| self.accumulator.has_undrained()
|| self.in_flight_produce_count > 0
|| has_pending_txn_completion
{
return true;
}
if let Some(reply) = self.close_reply.take() {
let result = self
.transaction
.as_ref()
.map(|transaction| transaction.shutdown_result().map_err(Error::from))
.unwrap_or(Ok(()));
let _ = reply.send(result);
}
false
}
fn next_wakeup(&self) -> Duration {
if let Some(wake) = self.blocked_append_wakeup() {
return wake;
}
if !self.application_events.is_empty() {
return Duration::ZERO;
}
let mut wake = Duration::from_millis(250);
wake = wake.min(self.metadata.next_refresh_in(self.config.metadata_max_age));
if let Some(expiration_wake) = self
.accumulator
.next_expiration_in(self.config.delivery_timeout)
{
wake = wake.min(expiration_wake);
}
let ready = self
.accumulator
.ready(self.metadata.cache(), &self.config, self.shutting_down);
if !ready.ready_by_leader.is_empty() || !ready.unknown_leaders.is_empty() {
return Duration::ZERO;
}
wake.min(ready.next_ready_check_delay)
}
fn blocked_append_wakeup(&self) -> Option<Duration> {
let ProducerRuntimeEvent::AppendRecord {
record,
enqueued_at,
..
} = self.application_events.front()?
else {
return None;
};
let estimated_size = estimate_record_size(record);
if self
.accumulator
.estimated_bytes()
.saturating_add(estimated_size)
<= self.config.buffer_memory
{
return None;
}
let wait_for_timeout = self.config.max_block.saturating_sub(enqueued_at.elapsed());
let ready = self
.accumulator
.ready(self.metadata.cache(), &self.config, self.shutting_down);
Some(wait_for_timeout.min(ready.next_ready_check_delay))
}
}
fn should_retry_init_transactional_producer_error(error: ResponseError) -> bool {
matches!(
error,
ResponseError::NotCoordinator
| ResponseError::CoordinatorNotAvailable
| ResponseError::CoordinatorLoadInProgress
| ResponseError::ConcurrentTransactions
)
}
#[cfg(test)]
mod cancellation_tests {
use super::*;
use crate::CancellationToken;
use crate::config::ProducerConfig;
#[tokio::test]
async fn cancelled_flush_event_is_abandoned_before_runtime_work() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let cancellation = CancellationToken::new();
cancellation.cancel();
let (reply, result) = oneshot::channel();
sender
.application_events
.push_back(ProducerRuntimeEvent::Flush {
cancellation: Some(cancellation),
reply,
});
sender.process_application_events().await;
assert!(sender.pending_flushes.is_empty());
assert!(matches!(result.await, Ok(Err(Error::Cancelled))));
}
#[tokio::test]
async fn cancelled_offsets_to_transaction_requeued_work_is_abandoned() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let cancellation = CancellationToken::new();
let (reply, result) = oneshot::channel();
sender
.application_events
.push_back(ProducerRuntimeEvent::SendOffsetsToTransaction {
offsets: vec![CommitOffset {
topic: "topic-a".to_owned(),
partition: 0,
offset: 1,
}],
group_metadata: ConsumerGroupMetadata::new("group-a"),
cancellation: Some(cancellation.clone()),
reply,
});
cancellation.cancel();
sender.process_application_events().await;
assert!(matches!(result.await, Ok(Err(Error::Cancelled))));
assert!(sender.application_events.is_empty());
}
#[test]
fn message_too_large_splits_multi_record_batch_without_failing_replies() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let key = TopicPartitionKey::new("topic-a".to_owned(), 0);
let (reply1, mut result1) = oneshot::channel();
let (reply2, mut result2) = oneshot::channel();
sender.accumulator.append(
crate::types::ProduceRecord::new("topic-a", 0, b"first".as_slice()),
reply1,
1024,
);
sender.accumulator.append(
crate::types::ProduceRecord::new("topic-a", 0, b"second".as_slice()),
reply2,
1024,
);
let batch = sender
.accumulator
.drain_unknown_leader(vec![key.clone()])
.pop()
.unwrap();
sender.handle_kafka_failure(1, batch, ResponseError::MessageTooLarge);
let stats = sender.accumulator.stats();
assert_eq!(stats.batch_count, 2);
assert_eq!(stats.record_count, 2);
assert!(matches!(
result1.try_recv(),
Err(oneshot::error::TryRecvError::Empty)
));
assert!(matches!(
result2.try_recv(),
Err(oneshot::error::TryRecvError::Empty)
));
}
#[tokio::test]
async fn duplicate_sequence_number_completes_idempotent_batch_as_success() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let (batch, mut result) = idempotent_batch("topic-a", 0, 11);
sender.handle_kafka_failure(1, batch, ResponseError::DuplicateSequenceNumber);
let ack = result.try_recv().unwrap().unwrap();
assert_eq!(ack.base_offset, -1);
assert_eq!(ack.offset, -1);
assert!(!sender.accumulator.has_undrained());
}
#[test]
fn unknown_producer_id_resets_idempotent_state_and_retries_batch() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let key = TopicPartitionKey::new("topic-a".to_owned(), 0);
sender.idempotent_producer = Some(ProducerIdentity { id: 7, epoch: 1 });
sender.idempotent_sequences.insert(key.clone(), 12);
let (batch, mut result) = idempotent_batch("topic-a", 0, 11);
sender.handle_kafka_failure(1, batch, ResponseError::UnknownProducerId);
assert!(sender.idempotent_producer.is_none());
assert!(!sender.idempotent_sequences.contains_key(&key));
assert!(matches!(
result.try_recv(),
Err(oneshot::error::TryRecvError::Empty)
));
let retried = sender
.accumulator
.drain_unknown_leader(vec![key])
.pop()
.unwrap();
assert_eq!(retried.attempts, 1);
assert!(retried.producer_state.is_none());
}
#[test]
fn out_of_order_sequence_resets_idempotent_state_and_retries_batch() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let key = TopicPartitionKey::new("topic-a".to_owned(), 0);
sender.idempotent_producer = Some(ProducerIdentity { id: 7, epoch: 1 });
sender.idempotent_sequences.insert(key.clone(), 12);
let (batch, mut result) = idempotent_batch("topic-a", 0, 11);
sender.handle_kafka_failure(1, batch, ResponseError::OutOfOrderSequenceNumber);
assert!(sender.idempotent_producer.is_none());
assert!(!sender.idempotent_sequences.contains_key(&key));
assert!(matches!(
result.try_recv(),
Err(oneshot::error::TryRecvError::Empty)
));
let retried = sender
.accumulator
.drain_unknown_leader(vec![key])
.pop()
.unwrap();
assert_eq!(retried.attempts, 1);
assert!(retried.producer_state.is_none());
}
#[tokio::test]
async fn non_retriable_idempotent_failure_resets_sequence_state() {
let mut sender = Sender::new(ProducerConfig::new("localhost:9092"));
let key = TopicPartitionKey::new("topic-a".to_owned(), 0);
sender.idempotent_producer = Some(ProducerIdentity { id: 7, epoch: 1 });
sender.idempotent_sequences.insert(key.clone(), 12);
let (batch, result) = idempotent_batch("topic-a", 0, 11);
sender.handle_kafka_failure(1, batch, ResponseError::InvalidRecord);
assert!(sender.idempotent_producer.is_none());
assert!(!sender.idempotent_sequences.contains_key(&key));
assert!(matches!(result.await, Ok(Err(Error::Broker(_)))));
}
#[test]
fn init_transactional_producer_retries_concurrent_transactions() {
assert!(should_retry_init_transactional_producer_error(
ResponseError::ConcurrentTransactions
));
assert!(should_retry_init_transactional_producer_error(
ResponseError::CoordinatorLoadInProgress
));
assert!(!should_retry_init_transactional_producer_error(
ResponseError::ProducerFenced
));
assert!(!should_retry_init_transactional_producer_error(
ResponseError::InvalidTransactionTimeout
));
}
fn idempotent_batch(
topic: &str,
partition: i32,
base_sequence: i32,
) -> (ProducerBatch, oneshot::Receiver<ClientResult<ProduceAck>>) {
let key = TopicPartitionKey::new(topic.to_owned(), partition);
let (reply, result) = oneshot::channel();
let batch = ProducerBatch {
key,
records: vec![crate::types::ProduceRecord::new(
topic,
partition,
b"value".as_slice(),
)],
replies: vec![reply],
created_at: Instant::now(),
estimated_bytes: 64,
attempts: 0,
retry_at: None,
producer_state: Some(BatchProducerState {
producer_id: 7,
producer_epoch: 1,
base_sequence,
transactional: false,
}),
};
(batch, result)
}
}