use crate::client::{self, KafkaClient};
use crate::error::{Error, Result};
use std::collections::HashMap;
use std::fmt;
use std::hash::{BuildHasher, BuildHasherDefault, Hasher};
use std::slice::from_ref;
use std::time::Duration;
use twox_hash::XxHash32;
#[cfg(feature = "producer_timestamp")]
use crate::protocol::produce::ProducerTimestamp;
#[cfg(feature = "security")]
use crate::client::SecurityConfig;
#[cfg(not(feature = "security"))]
type SecurityConfig = ();
use crate::client_internals::KafkaClientInternals;
use crate::protocol;
pub use crate::client::{Compression, ProduceConfirm, ProducePartitionConfirm, RequiredAcks};
pub const DEFAULT_ACK_TIMEOUT_MILLIS: u64 = 30 * 1000;
pub const DEFAULT_REQUIRED_ACKS: RequiredAcks = RequiredAcks::One;
pub trait AsBytes {
fn as_bytes(&self) -> &[u8];
}
impl AsBytes for () {
fn as_bytes(&self) -> &[u8] {
&[]
}
}
impl AsBytes for String {
fn as_bytes(&self) -> &[u8] {
self.as_ref()
}
}
impl AsBytes for Vec<u8> {
fn as_bytes(&self) -> &[u8] {
self.as_ref()
}
}
impl AsBytes for &[u8] {
fn as_bytes(&self) -> &[u8] {
self
}
}
impl AsBytes for &str {
fn as_bytes(&self) -> &[u8] {
str::as_bytes(self)
}
}
pub struct Record<'a, K, V> {
pub key: K,
pub value: V,
pub topic: &'a str,
pub partition: i32,
}
impl<'a, K, V> Record<'a, K, V> {
#[inline]
pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
Record {
key,
value,
topic,
partition: -1,
}
}
#[inline]
#[must_use]
pub fn with_partition(mut self, partition: i32) -> Self {
self.partition = partition;
self
}
}
impl<'a, V> Record<'a, (), V> {
#[inline]
pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
Record {
key: (),
value,
topic,
partition: -1,
}
}
}
impl<K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'_, K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",
self.topic, self.partition, self.key, self.value
)
}
}
pub struct Producer<P = DefaultPartitioner> {
client: KafkaClient,
state: State<P>,
config: Config,
}
struct State<P> {
partitions: HashMap<String, Partitions>,
partitioner: P,
}
struct Config {
ack_timeout: i32,
required_acks: i16,
}
impl Producer {
#[must_use]
pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
Builder::new(Some(client), Vec::new())
}
#[must_use]
pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
Builder::new(None, hosts)
}
#[must_use]
pub fn client(&self) -> &KafkaClient {
&self.client
}
pub fn client_mut(&mut self) -> &mut KafkaClient {
&mut self.client
}
#[must_use]
pub fn into_client(self) -> KafkaClient {
self.client
}
}
impl<P: Partitioner> Producer<P> {
pub fn send<K, V>(&mut self, rec: &Record<K, V>) -> Result<()>
where
K: AsBytes,
V: AsBytes,
{
let mut rs = self.send_all(from_ref(rec))?;
if self.config.required_acks == 0 {
Ok(())
} else {
assert_eq!(1, rs.len());
let mut produce_confirm = rs.pop().unwrap();
assert_eq!(1, produce_confirm.partition_confirms.len());
produce_confirm
.partition_confirms
.pop()
.unwrap()
.offset
.map_err(Error::Kafka)?;
Ok(())
}
}
pub fn send_all<K, V>(&mut self, recs: &[Record<'_, K, V>]) -> Result<Vec<ProduceConfirm>>
where
K: AsBytes,
V: AsBytes,
{
let partitioner = &mut self.state.partitioner;
let partitions = &self.state.partitions;
let client = &mut self.client;
let config = &self.config;
client.internal_produce_messages(
config.required_acks,
config.ack_timeout,
recs.iter().map(|r| {
let mut m = client::ProduceMessage {
key: to_option(r.key.as_bytes()),
value: to_option(r.value.as_bytes()),
topic: r.topic,
partition: r.partition,
};
partitioner.partition(Topics::new(partitions), &mut m);
m
}),
)
}
}
fn to_option(data: &[u8]) -> Option<&[u8]> {
if data.is_empty() { None } else { Some(data) }
}
impl<P> State<P> {
fn new(client: &mut KafkaClient, partitioner: P) -> State<P> {
let ts = client.topics();
let mut ids = HashMap::with_capacity(ts.len());
for t in ts {
let ps = t.partitions();
ids.insert(
t.name().to_owned(),
Partitions {
available_ids: ps.available_ids(),
num_all_partitions: ps.len() as u32,
},
);
}
State {
partitions: ids,
partitioner,
}
}
}
pub struct Builder<P = DefaultPartitioner> {
client: Option<KafkaClient>,
hosts: Vec<String>,
compression: Compression,
ack_timeout: Duration,
conn_idle_timeout: Duration,
required_acks: RequiredAcks,
partitioner: P,
security_config: Option<SecurityConfig>,
client_id: Option<String>,
#[cfg(feature = "producer_timestamp")]
producer_timestamp: Option<ProducerTimestamp>,
}
impl Builder {
fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
let mut b = Builder {
client,
hosts,
compression: client::DEFAULT_COMPRESSION,
ack_timeout: Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS),
conn_idle_timeout: Duration::from_millis(
client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
),
required_acks: DEFAULT_REQUIRED_ACKS,
partitioner: DefaultPartitioner::default(),
security_config: None,
client_id: None,
#[cfg(feature = "producer_timestamp")]
producer_timestamp: None,
};
if let Some(ref c) = b.client {
b.compression = c.compression();
b.conn_idle_timeout = c.connection_idle_timeout();
}
b
}
#[cfg(feature = "security")]
#[must_use]
pub fn with_security(mut self, security: SecurityConfig) -> Self {
self.security_config = Some(security);
self
}
#[must_use]
pub fn with_compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
#[must_use]
pub fn with_ack_timeout(mut self, timeout: Duration) -> Self {
self.ack_timeout = timeout;
self
}
#[must_use]
pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
self.conn_idle_timeout = timeout;
self
}
#[must_use]
pub fn with_required_acks(mut self, acks: RequiredAcks) -> Self {
self.required_acks = acks;
self
}
#[must_use]
pub fn with_client_id(mut self, client_id: String) -> Self {
self.client_id = Some(client_id);
self
}
#[cfg(feature = "producer_timestamp")]
#[must_use]
pub fn with_timestamp(mut self, timestamp: ProducerTimestamp) -> Self {
self.producer_timestamp = Some(timestamp);
self
}
}
impl<P> Builder<P> {
pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
Builder {
client: self.client,
hosts: self.hosts,
compression: self.compression,
ack_timeout: self.ack_timeout,
conn_idle_timeout: self.conn_idle_timeout,
required_acks: self.required_acks,
partitioner,
security_config: None,
client_id: None,
#[cfg(feature = "producer_timestamp")]
producer_timestamp: None,
}
}
#[cfg(not(feature = "security"))]
fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
KafkaClient::new(hosts)
}
#[cfg(feature = "security")]
fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
if let Some(security) = security {
KafkaClient::new_secure(hosts, security)
} else {
KafkaClient::new(hosts)
}
}
pub fn create(self) -> Result<Producer<P>> {
let (mut client, need_metadata) = match self.client {
Some(client) => (client, false),
None => (
Self::new_kafka_client(self.hosts, self.security_config),
true,
),
};
client.set_compression(self.compression);
client.set_connection_idle_timeout(self.conn_idle_timeout);
#[cfg(feature = "producer_timestamp")]
client.set_producer_timestamp(self.producer_timestamp);
if let Some(client_id) = self.client_id {
client.set_client_id(client_id);
}
let producer_config = Config {
ack_timeout: protocol::to_millis_i32(self.ack_timeout)?,
required_acks: self.required_acks as i16,
};
if need_metadata {
client.load_metadata_all()?;
}
let state = State::new(&mut client, self.partitioner);
Ok(Producer {
client,
state,
config: producer_config,
})
}
}
pub struct Topics<'a> {
partitions: &'a HashMap<String, Partitions>,
}
pub struct Partitions {
available_ids: Vec<i32>,
num_all_partitions: u32,
}
impl Partitions {
#[inline]
#[must_use]
pub fn available_ids(&self) -> &[i32] {
&self.available_ids
}
#[inline]
#[must_use]
pub fn num_available(&self) -> u32 {
self.available_ids.len() as u32
}
#[inline]
#[must_use]
pub fn num_all(&self) -> u32 {
self.num_all_partitions
}
}
impl<'a> Topics<'a> {
fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
Topics { partitions }
}
#[inline]
#[must_use]
pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {
self.partitions.get(topic)
}
}
pub trait Partitioner {
fn partition(&mut self, topics: Topics<'_>, msg: &mut client::ProduceMessage<'_, '_>);
}
pub type DefaultHasher = XxHash32;
#[derive(Default)]
pub struct DefaultPartitioner<H = BuildHasherDefault<DefaultHasher>> {
hash_builder: H,
cntr: u32,
}
impl DefaultPartitioner {
pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
DefaultPartitioner {
hash_builder,
cntr: 0,
}
}
#[must_use]
pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
where
B: Hasher + Default,
{
DefaultPartitioner {
hash_builder: BuildHasherDefault::<B>::default(),
cntr: 0,
}
}
}
impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
#[allow(unused_variables)]
fn partition(&mut self, topics: Topics<'_>, rec: &mut client::ProduceMessage<'_, '_>) {
if rec.partition >= 0 {
return;
}
let Some(partitions) = topics.partitions(rec.topic) else {
return;
};
if let Some(key) = rec.key {
let num_partitions = partitions.num_all();
if num_partitions == 0 {
return;
}
let mut h = self.hash_builder.build_hasher();
h.write(key);
let hash = h.finish() as u32;
rec.partition = (hash % num_partitions) as i32;
} else {
let avail = partitions.available_ids();
if !avail.is_empty() {
rec.partition = avail[self.cntr as usize % avail.len()];
self.cntr = self.cntr.wrapping_add(1);
}
}
}
}
#[cfg(test)]
mod default_partitioner_tests {
use std::collections::HashMap;
use std::hash::{BuildHasherDefault, Hasher};
use super::{DefaultHasher, DefaultPartitioner, Partitioner, Partitions, Topics};
use crate::client;
fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
let mut h = HashMap::new();
for topic in topics {
h.insert(topic.0.into(), topic.1);
}
h
}
fn assert_partitioning<P: Partitioner>(
topics: &HashMap<String, Partitions>,
p: &mut P,
topic: &str,
key: &str,
) -> i32 {
let mut msg = client::ProduceMessage {
key: Some(key.as_bytes()),
value: None,
topic,
partition: -1,
};
p.partition(Topics::new(topics), &mut msg);
let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
assert!(msg.partition >= 0 && msg.partition < num_partitions);
msg.partition
}
#[test]
fn test_key_partitioning() {
let h = topics_map(vec![
(
"foo",
Partitions {
available_ids: vec![0, 1, 4],
num_all_partitions: 5,
},
),
(
"bar",
Partitions {
available_ids: vec![0, 1],
num_all_partitions: 2,
},
),
]);
let mut p: DefaultPartitioner<BuildHasherDefault<DefaultHasher>> =
DefaultPartitioner::default();
let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
assert_eq!(h1, h2);
let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
assert!(h3 != h4);
}
#[derive(Default)]
struct MyCustomHasher(u64);
impl Hasher for MyCustomHasher {
fn finish(&self) -> u64 {
self.0
}
fn write(&mut self, bytes: &[u8]) {
self.0 = u64::from(bytes[0]);
}
}
#[test]
fn default_partitioner_with_custom_hasher_default() {
let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();
let h = topics_map(vec![
(
"confirms",
Partitions {
available_ids: vec![0, 1],
num_all_partitions: 2,
},
),
(
"contents",
Partitions {
available_ids: vec![0, 1, 9],
num_all_partitions: 10,
},
),
]);
let p1 = assert_partitioning(&h, &mut p, "confirms", "A" );
assert_eq!(1, p1);
let p2 = assert_partitioning(&h, &mut p, "contents", "B" );
assert_eq!(6, p2);
}
}