mod batch;
mod builder;
mod config;
mod partitioner;
mod record;
mod transaction;
pub use self::batch::{BatchProducer, BatchProducerBuilder};
pub use self::partitioner::{
DefaultPartitioner, Partitioner, RoundRobinPartitioner, StickyPartitioner, UniformPartitioner,
};
pub use self::record::{AsBytes, Headers, Record};
pub use config::BatchConfig;
pub use transaction::{TransactionalBuilder, TransactionalProducer};
pub use crate::client::{Compression, ProduceConfirm, ProducePartitionConfirm, RequiredAcks};
pub use config::DEFAULT_ACK_TIMEOUT_MILLIS;
pub use config::DEFAULT_REQUIRED_ACKS;
use crate::client::KafkaClientInternals;
use crate::client::{self, KafkaClient};
use crate::error::{Error, Result};
use std::collections::HashMap;
use std::slice::from_ref;
use config::Config;
use partitioner::Topics;
pub(crate) struct State<P> {
partitions: HashMap<String, partitioner::Partitions>,
partitioner: P,
}
pub struct Producer<P = DefaultPartitioner> {
client: KafkaClient,
state: State<P>,
config: Config,
}
impl Producer {
#[must_use]
pub fn from_client(client: KafkaClient) -> builder::Builder<DefaultPartitioner> {
Builder::new(Some(client), Vec::new())
}
#[must_use]
pub fn from_hosts(hosts: Vec<String>) -> builder::Builder<DefaultPartitioner> {
Builder::new(None, hosts)
}
#[must_use]
pub fn client(&self) -> &KafkaClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut KafkaClient {
&mut self.client
}
#[must_use]
pub fn into_client(self) -> KafkaClient {
self.client
}
}
impl<P: Partitioner> Producer<P> {
pub fn send<K, V>(&mut self, rec: &Record<K, V>) -> Result<()>
where
K: AsBytes,
V: AsBytes,
{
let mut rs = self.send_all(from_ref(rec))?;
if self.config.required_acks == 0 {
Ok(())
} else {
assert_eq!(1, rs.len());
let mut produce_confirm = rs.pop().unwrap();
assert_eq!(1, produce_confirm.partition_confirms.len());
produce_confirm
.partition_confirms
.pop()
.unwrap()
.offset
.map_err(Error::Kafka)?;
Ok(())
}
}
pub fn send_all<K, V>(&mut self, recs: &[Record<'_, K, V>]) -> Result<Vec<ProduceConfirm>>
where
K: AsBytes,
V: AsBytes,
{
if self.config.transactional_id.is_some() {
return Err(Error::Config(
"transactional_id is configured on Producer; use TransactionalProducer instead"
.into(),
));
}
if self.config.enable_idempotence {
return Err(Error::Config(
"idempotent mode is not supported by Producer; use TransactionalProducer".into(),
));
}
let partitioner = &mut self.state.partitioner;
let partitions = &self.state.partitions;
let client = &mut self.client;
let config = &self.config;
client.internal_produce_messages(
config.required_acks,
config.ack_timeout,
recs.iter().map(|r| {
let mut m = client::ProduceMessage {
key: to_option(r.key.as_bytes()),
value: to_option(r.value.as_bytes()),
topic: r.topic,
partition: r.partition,
headers: &r.headers.0,
};
partitioner.partition(Topics::new(partitions), &mut m);
m
}),
)
}
}
fn to_option(data: &[u8]) -> Option<&[u8]> {
if data.is_empty() { None } else { Some(data) }
}
impl<P> State<P> {
pub(crate) fn new(client: &mut KafkaClient, partitioner: P) -> State<P> {
let ts = client.topics();
let mut ids = HashMap::with_capacity(ts.len());
for t in ts {
let ps = t.partitions();
#[allow(clippy::cast_possible_truncation)]
let num_all_partitions = ps.len() as u32;
ids.insert(
t.name().to_owned(),
partitioner::Partitions {
available_ids: ps.available_ids(),
num_all_partitions,
},
);
}
State {
partitions: ids,
partitioner,
}
}
}
pub use builder::Builder;