mod accumulator;
mod partitioner;
mod request;
mod sender;
mod transaction;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, instrument};
use self::sender::Sender;
use crate::config::ProducerConfig;
use crate::types::{CommitOffset, ConsumerGroupMetadata, KafkaMessage, ProduceAck, ProduceRecord};
use crate::{CancellationToken, Error, ProducerError, Result};
pub struct KafkaProducer {
producer_runtime: ProducerRuntime,
join: JoinHandle<()>,
default_topic: Option<String>,
default_partition: Option<i32>,
}
impl KafkaProducer {
#[instrument(
name = "producer.connect",
level = "debug",
skip(config),
fields(
bootstrap_server_count = config.bootstrap_servers.len(),
client_id = %config.client_id,
transactional = config.is_transactional(),
acks = config.acks
)
)]
pub async fn connect(config: ProducerConfig) -> Result<Self> {
if config.is_transactional() && config.acks != -1 {
return Err(ProducerError::TransactionalRequiresAcksAll.into());
}
if config.is_idempotent() && config.acks != -1 {
return Err(ProducerError::IdempotenceRequiresAcksAll.into());
}
if config.is_idempotent() && config.max_retries == 0 {
return Err(ProducerError::IdempotenceRequiresRetries.into());
}
let (tx, rx) = mpsc::channel(64);
let sender = Sender::new(config);
let join = tokio::spawn(async move {
sender.run(rx).await;
});
let producer = Self {
producer_runtime: ProducerRuntime::new(tx),
join,
default_topic: None,
default_partition: None,
};
if let Err(error) = producer.warm_up().await {
producer.join.abort();
return Err(error);
}
debug!("producer connected");
Ok(producer)
}
pub fn with_defaults(
mut self,
default_topic: Option<String>,
default_partition: Option<i32>,
) -> Self {
self.default_topic = default_topic;
self.default_partition = default_partition;
self
}
pub async fn send_message(&self, message: KafkaMessage) -> Result<ProduceAck> {
self.send_message_with_cancellation(message, None).await
}
pub async fn send_message_with_cancellation(
&self,
message: KafkaMessage,
cancellation: Option<CancellationToken>,
) -> Result<ProduceAck> {
let record = message
.into_record(self.default_topic.as_deref(), self.default_partition)
.map_err(Error::from)?;
self.send_record_with_cancellation(record, cancellation)
.await
}
pub async fn send_value(&self, value: impl Into<bytes::Bytes>) -> Result<ProduceAck> {
self.send(value).await
}
pub async fn send_to(
&self,
topic: impl Into<String>,
value: impl Into<bytes::Bytes>,
) -> Result<ProduceAck> {
let message = KafkaMessage::new(value).with_topic(topic);
self.send_message(message).await
}
#[instrument(name = "producer.send", level = "debug", skip(self, input))]
pub async fn send(&self, input: impl Into<KafkaMessage>) -> Result<ProduceAck> {
self.send_with_cancellation(input, None).await
}
pub async fn send_with_cancellation(
&self,
input: impl Into<KafkaMessage>,
cancellation: Option<CancellationToken>,
) -> Result<ProduceAck> {
let record = input
.into()
.into_record(self.default_topic.as_deref(), self.default_partition)
.map_err(Error::from)?;
self.send_record_with_cancellation(record, cancellation)
.await
}
#[instrument(
name = "producer.send_record",
level = "debug",
skip(self, record),
fields(
topic = %record.topic,
partition = record.partition,
value_len = record.value.as_ref().map(|value| value.len()).unwrap_or(0),
tombstone = record.value.is_none(),
headers = record.headers.len(),
has_key = record.key.is_some(),
has_timestamp = record.timestamp.is_some()
)
)]
async fn send_record_with_cancellation(
&self,
record: ProduceRecord,
cancellation: Option<CancellationToken>,
) -> Result<ProduceAck> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::AppendRecord {
record,
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "accepting the produce request",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "answering the produce request",
})?
}
#[instrument(name = "producer.flush", level = "debug", skip(self))]
pub async fn flush(&self) -> Result<()> {
self.flush_with_cancellation(None).await
}
pub async fn flush_with_cancellation(
&self,
cancellation: Option<CancellationToken>,
) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::Flush {
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "flushing buffered records",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring { operation: "flush" })?
}
#[instrument(name = "producer.init_transactions", level = "debug", skip(self))]
pub async fn init_transactions(&self) -> Result<()> {
self.init_transactions_with_cancellation(None).await
}
pub async fn init_transactions_with_cancellation(
&self,
cancellation: Option<CancellationToken>,
) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::InitTransactions {
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "initializing transactions",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "initializing transactions",
})?
}
#[instrument(name = "producer.begin_transaction", level = "debug", skip(self))]
pub async fn begin_transaction(&self) -> Result<()> {
self.begin_transaction_with_cancellation(None).await
}
pub async fn begin_transaction_with_cancellation(
&self,
cancellation: Option<CancellationToken>,
) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::BeginTransaction {
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "beginning the transaction",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "beginning the transaction",
})?
}
#[instrument(name = "producer.commit_transaction", level = "debug", skip(self))]
pub async fn commit_transaction(&self) -> Result<()> {
self.commit_transaction_with_cancellation(None).await
}
pub async fn commit_transaction_with_cancellation(
&self,
cancellation: Option<CancellationToken>,
) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::CommitTransaction {
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "committing the transaction",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "committing the transaction",
})?
}
#[instrument(name = "producer.abort_transaction", level = "debug", skip(self))]
pub async fn abort_transaction(&self) -> Result<()> {
self.abort_transaction_with_cancellation(None).await
}
pub async fn abort_transaction_with_cancellation(
&self,
cancellation: Option<CancellationToken>,
) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::AbortTransaction {
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "aborting the transaction",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "aborting the transaction",
})?
}
#[instrument(
name = "producer.send_offsets_to_transaction",
level = "debug",
skip(self, offsets, group_metadata),
fields(
offset_count = offsets.len(),
group_id = %group_metadata.group_id,
generation_id = group_metadata.generation_id,
has_instance_id = group_metadata.group_instance_id.is_some()
)
)]
pub async fn send_offsets_to_transaction(
&self,
offsets: Vec<CommitOffset>,
group_metadata: ConsumerGroupMetadata,
) -> Result<()> {
self.send_offsets_to_transaction_with_cancellation(offsets, group_metadata, None)
.await
}
pub async fn send_offsets_to_transaction_with_cancellation(
&self,
offsets: Vec<CommitOffset>,
group_metadata: ConsumerGroupMetadata,
cancellation: Option<CancellationToken>,
) -> Result<()> {
if offsets.is_empty() {
return Ok(());
}
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::SendOffsetsToTransaction {
offsets,
group_metadata,
cancellation,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "sending offsets to the transaction",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "send_offsets_to_transaction",
})?
}
#[instrument(name = "producer.shutdown", level = "debug", skip(self))]
pub async fn shutdown(self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::Shutdown { reply: reply_tx },
ProducerError::ThreadStoppedBefore {
operation: "shutdown",
},
)
.await?;
let result = reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "shutdown",
})?;
self.join.await.map_err(ProducerError::Join)?;
result
}
#[instrument(name = "producer.warm_up", level = "trace", skip(self))]
async fn warm_up(&self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.producer_runtime
.send(
ProducerRuntimeEvent::WarmUp {
cancellation: None,
reply: reply_tx,
},
ProducerError::ThreadStoppedBefore {
operation: "startup",
},
)
.await?;
reply_rx
.await
.map_err(|_| ProducerError::ThreadStoppedDuring {
operation: "startup",
})?
}
}
struct ProducerRuntime {
tx: mpsc::Sender<ProducerRuntimeEvent>,
}
impl ProducerRuntime {
fn new(tx: mpsc::Sender<ProducerRuntimeEvent>) -> Self {
Self { tx }
}
async fn send(&self, event: ProducerRuntimeEvent, stopped_error: ProducerError) -> Result<()> {
self.tx.send(event).await.map_err(|_| stopped_error.into())
}
}
pub enum ProducerRuntimeEvent {
WarmUp {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
BeginTransaction {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
InitTransactions {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
AppendRecord {
record: ProduceRecord,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<ProduceAck>>,
},
Flush {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
CommitTransaction {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
AbortTransaction {
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
SendOffsetsToTransaction {
offsets: Vec<CommitOffset>,
group_metadata: ConsumerGroupMetadata,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<Result<()>>,
},
Shutdown {
reply: oneshot::Sender<Result<()>>,
},
}
impl ProducerRuntimeEvent {
pub(crate) fn is_cancelled(&self) -> bool {
match self {
Self::WarmUp { cancellation, .. }
| Self::BeginTransaction { cancellation, .. }
| Self::InitTransactions { cancellation, .. }
| Self::AppendRecord { cancellation, .. }
| Self::Flush { cancellation, .. }
| Self::CommitTransaction { cancellation, .. }
| Self::AbortTransaction { cancellation, .. }
| Self::SendOffsetsToTransaction { cancellation, .. } => cancellation
.as_ref()
.is_some_and(CancellationToken::is_cancelled),
Self::Shutdown { .. } => false,
}
}
pub(crate) fn send_cancelled(self) {
match self {
Self::WarmUp { reply, .. }
| Self::BeginTransaction { reply, .. }
| Self::InitTransactions { reply, .. }
| Self::Flush { reply, .. }
| Self::CommitTransaction { reply, .. }
| Self::AbortTransaction { reply, .. }
| Self::SendOffsetsToTransaction { reply, .. } => {
let _ = reply.send(Err(Error::Cancelled));
}
Self::AppendRecord { reply, .. } => {
let _ = reply.send(Err(Error::Cancelled));
}
Self::Shutdown { .. } => {}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ProducerConfig;
#[tokio::test]
async fn transactional_connect_requires_acks_all() {
let result = KafkaProducer::connect(
ProducerConfig::new("127.0.0.1:1")
.with_transactional_id("tx-a")
.with_acks(1),
)
.await;
assert!(matches!(
result,
Err(crate::Error::Producer(
ProducerError::TransactionalRequiresAcksAll
))
));
}
#[tokio::test]
async fn idempotent_connect_requires_acks_all() {
let result = KafkaProducer::connect(
ProducerConfig::new("127.0.0.1:1")
.with_enable_idempotence(true)
.with_acks(1),
)
.await;
assert!(matches!(
result,
Err(crate::Error::Producer(
ProducerError::IdempotenceRequiresAcksAll
))
));
}
#[tokio::test]
async fn idempotent_connect_requires_retries() {
let result = KafkaProducer::connect(
ProducerConfig::new("127.0.0.1:1")
.with_enable_idempotence(true)
.with_max_retries(0),
)
.await;
assert!(matches!(
result,
Err(crate::Error::Producer(
ProducerError::IdempotenceRequiresRetries
))
));
}
}