pub use crate::compression::Compression;
#[cfg(feature = "producer_timestamp")]
pub use crate::protocol::produce::ProducerTimestamp;
pub use crate::utils::PartitionOffset;
use crate::utils::TimestampedPartitionOffset;
use std;
use std::collections::hash_map;
use std::collections::hash_map::HashMap;
use std::iter::Iterator;
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use tracing::{debug, trace};
#[cfg(not(feature = "producer_timestamp"))]
use crate::protocol::produce::ProducerTimestamp;
#[cfg(feature = "security")]
pub use self::network::SecurityConfig;
use crate::error::{Error, KafkaCode, Result};
use crate::protocol;
use crate::client_internals::KafkaClientInternals;
pub mod metadata;
pub(crate) mod network;
mod state;
pub mod fetch_kp {
pub use crate::protocol::fetch::{
OwnedData, OwnedFetchResponse, OwnedMessage, OwnedPartition, OwnedTopic,
};
}
pub mod fetch {
pub use crate::protocol::fetch::OwnedFetchResponse as Response;
pub use crate::protocol::fetch::{OwnedData, OwnedMessage, OwnedPartition, OwnedTopic};
}
const DEFAULT_CONNECTION_RW_TIMEOUT_SECS: u64 = 120;
fn default_conn_rw_timeout() -> Option<Duration> {
match DEFAULT_CONNECTION_RW_TIMEOUT_SECS {
0 => None,
n => Some(Duration::from_secs(n)),
}
}
pub const DEFAULT_COMPRESSION: Compression = Compression::NONE;
pub const DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS: u64 = 100;
pub const DEFAULT_FETCH_MIN_BYTES: i32 = 4096;
pub const DEFAULT_FETCH_MAX_BYTES_PER_PARTITION: i32 = 32 * 1024;
pub const DEFAULT_FETCH_CRC_VALIDATION: bool = true;
pub const DEFAULT_GROUP_OFFSET_STORAGE: Option<GroupOffsetStorage> = None;
pub const DEFAULT_RETRY_BACKOFF_TIME_MILLIS: u64 = 100;
pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME_MILLIS as u32;
pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS: u64 = 540_000;
pub(crate) const DEFAULT_PRODUCER_TIMESTAMP: Option<ProducerTimestamp> = None;
#[derive(Debug)]
pub struct KafkaClient {
config: ClientConfig,
conn_pool: network::Connections,
state: state::ClientState,
api_versions: crate::protocol::api_versions::ApiVersionCache,
}
#[derive(Debug)]
struct ClientConfig {
client_id: String,
hosts: Vec<String>,
compression: Compression,
fetch_max_wait_time: i32,
fetch_min_bytes: i32,
fetch_max_bytes_per_partition: i32,
fetch_crc_validation: bool,
offset_storage: Option<GroupOffsetStorage>,
retry_backoff_time: Duration,
retry_max_attempts: u32,
#[allow(unused)]
producer_timestamp: Option<ProducerTimestamp>,
}
#[derive(Debug, Copy, Clone)]
pub enum FetchOffset {
Earliest,
Latest,
ByTime(i64),
}
impl FetchOffset {
fn to_kafka_value(self) -> i64 {
match self {
FetchOffset::Earliest => -2,
FetchOffset::Latest => -1,
FetchOffset::ByTime(n) => n,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum GroupOffsetStorage {
Zookeeper,
Kafka,
}
#[derive(Debug)]
pub struct FetchGroupOffset<'a> {
pub topic: &'a str,
pub partition: i32,
}
impl<'a> FetchGroupOffset<'a> {
#[inline]
#[must_use]
pub fn new(topic: &'a str, partition: i32) -> Self {
FetchGroupOffset { topic, partition }
}
}
impl<'a> AsRef<FetchGroupOffset<'a>> for FetchGroupOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct CommitOffset<'a> {
pub offset: i64,
pub topic: &'a str,
pub partition: i32,
}
impl<'a> CommitOffset<'a> {
#[must_use]
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
CommitOffset {
offset,
topic,
partition,
}
}
}
impl<'a> AsRef<CommitOffset<'a>> for CommitOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug, Copy, Clone)]
pub enum RequiredAcks {
None = 0,
One = 1,
All = -1,
}
#[derive(Debug)]
pub struct ProduceMessage<'a, 'b> {
pub key: Option<&'b [u8]>,
pub value: Option<&'b [u8]>,
pub topic: &'a str,
pub partition: i32,
}
impl<'a, 'b> AsRef<ProduceMessage<'a, 'b>> for ProduceMessage<'a, 'b> {
fn as_ref(&self) -> &Self {
self
}
}
impl<'a, 'b> ProduceMessage<'a, 'b> {
#[must_use]
pub fn new(
topic: &'a str,
partition: i32,
key: Option<&'b [u8]>,
value: Option<&'b [u8]>,
) -> Self {
ProduceMessage {
key,
value,
topic,
partition,
}
}
}
#[derive(Debug)]
pub struct FetchPartition<'a> {
pub topic: &'a str,
pub offset: i64,
pub partition: i32,
pub max_bytes: i32,
}
impl<'a> FetchPartition<'a> {
#[must_use]
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
FetchPartition {
topic,
partition,
offset,
max_bytes: -1,
}
}
#[must_use]
pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
self.max_bytes = max_bytes;
self
}
}
impl<'a> AsRef<FetchPartition<'a>> for FetchPartition<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct ProduceConfirm {
pub topic: String,
pub partition_confirms: Vec<ProducePartitionConfirm>,
}
#[derive(Debug)]
pub struct ProducePartitionConfirm {
pub offset: std::result::Result<i64, KafkaCode>,
pub partition: i32,
}
impl KafkaClient {
#[must_use]
pub fn new(hosts: Vec<String>) -> KafkaClient {
KafkaClient {
config: ClientConfig {
client_id: String::new(),
hosts,
compression: DEFAULT_COMPRESSION,
fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
))
.expect("invalid default-fetch-max-time-millis"),
fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
producer_timestamp: DEFAULT_PRODUCER_TIMESTAMP,
},
conn_pool: network::Connections::new(
default_conn_rw_timeout(),
Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
),
state: state::ClientState::new(),
api_versions: crate::protocol::api_versions::ApiVersionCache::new(),
}
}
#[cfg(feature = "security")]
#[must_use]
pub fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient {
KafkaClient {
config: ClientConfig {
client_id: String::new(),
hosts,
compression: DEFAULT_COMPRESSION,
fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
))
.expect("invalid default-fetch-max-time-millis"),
fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
producer_timestamp: DEFAULT_PRODUCER_TIMESTAMP,
},
conn_pool: network::Connections::new_with_security(
default_conn_rw_timeout(),
Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
Some(security),
),
state: state::ClientState::new(),
api_versions: crate::protocol::api_versions::ApiVersionCache::new(),
}
}
#[inline]
#[must_use]
pub fn hosts(&self) -> &[String] {
&self.config.hosts
}
pub fn set_client_id(&mut self, client_id: String) {
self.config.client_id = client_id;
}
#[must_use]
pub fn client_id(&self) -> &str {
&self.config.client_id
}
#[inline]
pub fn set_compression(&mut self, compression: Compression) {
self.config.compression = compression;
}
#[inline]
#[must_use]
pub fn compression(&self) -> Compression {
self.config.compression
}
#[inline]
pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()> {
self.config.fetch_max_wait_time = protocol::to_millis_i32(max_wait_time)?;
Ok(())
}
#[inline]
#[must_use]
pub fn fetch_max_wait_time(&self) -> Duration {
Duration::from_millis(self.config.fetch_max_wait_time as u64)
}
#[inline]
pub fn set_fetch_min_bytes(&mut self, min_bytes: i32) {
self.config.fetch_min_bytes = min_bytes;
}
#[inline]
#[must_use]
pub fn fetch_min_bytes(&self) -> i32 {
self.config.fetch_min_bytes
}
#[inline]
pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32) {
self.config.fetch_max_bytes_per_partition = max_bytes;
}
#[inline]
#[must_use]
pub fn fetch_max_bytes_per_partition(&self) -> i32 {
self.config.fetch_max_bytes_per_partition
}
#[inline]
pub fn set_fetch_crc_validation(&mut self, validate_crc: bool) {
self.config.fetch_crc_validation = validate_crc;
}
#[inline]
#[must_use]
pub fn fetch_crc_validation(&self) -> bool {
self.config.fetch_crc_validation
}
#[inline]
pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>) {
self.config.offset_storage = storage;
}
#[must_use]
pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage> {
self.config.offset_storage
}
#[inline]
pub fn set_retry_backoff_time(&mut self, time: Duration) {
self.config.retry_backoff_time = time;
}
#[must_use]
pub fn retry_backoff_time(&self) -> Duration {
self.config.retry_backoff_time
}
#[inline]
pub fn set_retry_max_attempts(&mut self, attempts: u32) {
self.config.retry_max_attempts = attempts;
}
#[inline]
#[must_use]
pub fn retry_max_attempts(&self) -> u32 {
self.config.retry_max_attempts
}
#[inline]
pub fn set_connection_idle_timeout(&mut self, timeout: Duration) {
self.conn_pool.set_idle_timeout(timeout);
}
#[inline]
#[must_use]
pub fn connection_idle_timeout(&self) -> Duration {
self.conn_pool.idle_timeout()
}
#[cfg(feature = "producer_timestamp")]
#[inline]
pub fn set_producer_timestamp(&mut self, producer_timestamp: Option<ProducerTimestamp>) {
self.config.producer_timestamp = producer_timestamp;
}
#[cfg(feature = "producer_timestamp")]
#[inline]
#[must_use]
pub fn producer_timestamp(&self) -> Option<ProducerTimestamp> {
self.config.producer_timestamp
}
#[inline]
#[must_use]
pub fn topics(&self) -> metadata::Topics<'_> {
metadata::Topics::new(self)
}
#[inline]
pub fn load_metadata_all(&mut self) -> Result<()> {
self.reset_metadata();
self.load_metadata_kp::<&str>(&[])
}
#[inline]
pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
self.load_metadata_kp(topics)
}
pub fn load_metadata_kp<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
let resp = self.fetch_metadata_kp(topics)?;
self.state.update_metadata(resp);
Ok(())
}
#[inline]
pub fn reset_metadata(&mut self) {
self.state.clear_metadata();
}
fn fetch_metadata_kp<T: AsRef<str>>(
&mut self,
topics: &[T],
) -> Result<protocol::metadata::MetadataResponseData> {
let correlation = self.state.next_correlation_id();
let now = Instant::now();
let topic_strs: Vec<&str> = topics.iter().map(|t| t.as_ref()).collect();
for host in &self.config.hosts {
debug!("fetch_metadata_kp: requesting metadata from {}", host);
match self.conn_pool.get_conn(host, now) {
Ok(conn) => {
if !self.api_versions.contains(host) {
let av_correlation = self.state.next_correlation_id();
match crate::protocol::api_versions::fetch_api_versions(
conn, av_correlation, &self.config.client_id,
) {
Ok(versions) => {
self.api_versions.insert(host.clone(), versions);
}
Err(e) => debug!(
"fetch_metadata_kp: API version negotiation failed for {}: {}",
host, e
),
}
}
let (header, request) =
crate::protocol::metadata::build_metadata_request(correlation, &self.config.client_id, Some(&topic_strs));
match __kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_METADATA) {
Ok(()) => {
match __kp_get_response::<kafka_protocol::messages::MetadataResponse>(conn, crate::protocol::API_VERSION_METADATA) {
Ok(kp_resp) => {
return Ok(crate::protocol::metadata::convert_metadata_response(kp_resp, correlation));
}
Err(e) => debug!(
"fetch_metadata_kp: failed to decode metadata from {}: {}",
host, e
),
}
}
Err(e) => debug!(
"fetch_metadata_kp: failed to request metadata from {}: {}",
host, e
),
}
}
Err(e) => {
debug!("fetch_metadata_kp: failed to connect to {}: {}", host, e);
}
}
}
Err(Error::NoHostReachable)
}
pub fn fetch_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
self.fetch_offsets_kp(topics, offset)
}
pub fn list_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<TimestampedPartitionOffset>>> {
self.fetch_offsets_kp(topics, offset)
.map(|m| {
m.into_iter()
.map(|(topic, offsets)| {
let timestamped = offsets
.into_iter()
.map(|po| TimestampedPartitionOffset {
offset: po.offset,
partition: po.partition,
time: 0, })
.collect();
(topic, timestamped)
})
.collect()
})
}
fn get_key_from_entry<'a, K: 'a, V: 'a>(entry: hash_map::Entry<'a, K, V>) -> K {
match entry {
hash_map::Entry::Occupied(e) => e.remove_entry().0,
hash_map::Entry::Vacant(e) => e.into_key(),
}
}
pub fn fetch_topic_offsets<T: AsRef<str>>(
&mut self,
topic: T,
offset: FetchOffset,
) -> Result<Vec<PartitionOffset>> {
let topic = topic.as_ref();
let mut m = self.fetch_offsets(&[topic], offset)?;
let offs = m.remove(topic).unwrap_or_default();
if offs.is_empty() {
Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
} else {
Ok(offs)
}
}
pub fn fetch_messages<'a, I, J>(
&mut self,
input: I,
) -> Result<Vec<fetch_kp::OwnedFetchResponse>>
where
J: AsRef<FetchPartition<'a>>,
I: IntoIterator<Item = J>,
{
self.fetch_messages_kp(input)
}
pub fn fetch_messages_for_partition(
&mut self,
req: &FetchPartition<'_>,
) -> Result<Vec<fetch_kp::OwnedFetchResponse>> {
self.fetch_messages_kp([req])
}
pub fn fetch_messages_kp<'a, I, J>(
&mut self,
input: I,
) -> Result<Vec<fetch_kp::OwnedFetchResponse>>
where
J: AsRef<FetchPartition<'a>>,
I: IntoIterator<Item = J>,
{
let state = &mut self.state;
let config = &self.config;
let correlation = state.next_correlation_id();
let mut broker_partitions: HashMap<&str, Vec<(&str, i32, i64, i32)>> = HashMap::new();
for inp in input {
let inp = inp.as_ref();
if let Some(broker) = state.find_broker(inp.topic, inp.partition) {
broker_partitions.entry(broker).or_default().push((
inp.topic,
inp.partition,
inp.offset,
if inp.max_bytes > 0 {
inp.max_bytes
} else {
config.fetch_max_bytes_per_partition
},
));
}
}
__fetch_messages_kp(
&mut self.conn_pool,
correlation,
&config.client_id,
config.fetch_max_wait_time,
config.fetch_min_bytes,
broker_partitions,
)
}
pub fn produce_messages<'a, 'b, I, J>(
&mut self,
acks: RequiredAcks,
ack_timeout: Duration,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
self.produce_messages_kp(acks, ack_timeout, messages)
}
pub fn produce_messages_kp<'a, 'b, I, J>(
&mut self,
acks: RequiredAcks,
ack_timeout: Duration,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
self.internal_produce_messages_kp(acks as i16, protocol::to_millis_i32(ack_timeout)?, messages)
}
fn internal_produce_messages_kp<'a, 'b, I, J>(
&mut self,
required_acks: i16,
ack_timeout_ms: i32,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut broker_msgs: HashMap<&str, Vec<(&str, i32, Option<&'b [u8]>, Option<&'b [u8]>)>> =
HashMap::new();
for msg in messages {
let msg = msg.as_ref();
match state.find_broker(msg.topic, msg.partition) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(broker) => {
broker_msgs
.entry(broker)
.or_default()
.push((msg.topic, msg.partition, msg.key, msg.value));
}
}
}
__produce_messages_kp(
&mut self.conn_pool,
correlation,
&config.client_id,
required_acks,
ack_timeout_ms,
config.compression,
broker_msgs,
required_acks == 0,
)
}
pub fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
where
J: AsRef<CommitOffset<'a>>,
I: IntoIterator<Item = J>,
{
self.commit_offsets_kp(group, offsets)
}
pub fn commit_offset(
&mut self,
group: &str,
topic: &str,
partition: i32,
offset: i64,
) -> Result<()> {
self.commit_offset_kp(group, topic, partition, offset)
}
pub fn fetch_group_offsets<'a, J, I>(
&mut self,
group: &str,
partitions: I,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
where
J: AsRef<FetchGroupOffset<'a>>,
I: IntoIterator<Item = J>,
{
self.fetch_group_offsets_kp(group, partitions)
}
pub fn fetch_group_topic_offset(
&mut self,
group: &str,
topic: &str,
) -> Result<Vec<PartitionOffset>> {
self.fetch_group_topic_offset_kp(group, topic)
}
pub fn fetch_offsets_kp<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
let time = offset.to_kafka_value();
let n_topics = topics.len();
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut broker_partitions: HashMap<&str, Vec<(&str, i32, i64)>> = HashMap::new();
for topic in topics {
let topic = topic.as_ref();
if let Some(ps) = state.partitions_for(topic) {
for (id, host) in ps
.iter()
.filter_map(|(id, p)| p.broker(state).map(|b| (id, b.host())))
{
broker_partitions
.entry(host)
.or_default()
.push((topic, id, time));
}
}
}
let now = Instant::now();
let mut res: HashMap<String, Vec<PartitionOffset>> = HashMap::with_capacity(n_topics);
for (host, partitions) in broker_partitions {
let conn = self.conn_pool.get_conn(host, now)
.map_err(|e| e.with_broker_context(host, "ListOffsets"))?;
let (header, request) = crate::protocol::offset::build_list_offsets_request(
correlation, &config.client_id, &partitions,
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_LIST_OFFSETS)
.map_err(|e| e.with_broker_context(host, "ListOffsets"))?;
let kp_resp = __kp_get_response::<kafka_protocol::messages::ListOffsetsResponse>(
conn,
crate::protocol::API_VERSION_LIST_OFFSETS,
).map_err(|e| e.with_broker_context(host, "ListOffsets"))?;
let our_resp = crate::protocol::offset::convert_list_offsets_response(kp_resp, correlation);
for tp in our_resp.topic_partitions {
let mut entry = res.entry(tp.topic);
let mut new_resp_offsets = None;
let mut err = None;
{
let resp_offsets = match entry {
hash_map::Entry::Occupied(ref mut e) => e.get_mut(),
hash_map::Entry::Vacant(_) => {
new_resp_offsets.get_or_insert(Vec::with_capacity(tp.partitions.len()))
}
};
for p in tp.partitions {
let partition_offset = match p.to_offset() {
Ok(po) => po,
Err(code) => {
err = Some((p.partition, code));
break;
}
};
resp_offsets.push(partition_offset);
}
}
if let Some((partition, code)) = err {
let topic = KafkaClient::get_key_from_entry(entry);
return Err(Error::TopicPartitionError {
topic_name: topic,
partition_id: partition,
error_code: code,
});
}
if let hash_map::Entry::Vacant(e) = entry {
e.insert(new_resp_offsets.unwrap());
}
}
}
Ok(res)
}
pub fn commit_offsets_kp<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
where
J: AsRef<CommitOffset<'a>>,
I: IntoIterator<Item = J>,
{
let correlation_id = self.state.next_correlation_id();
let config = &self.config;
let mut offset_vec: Vec<(&str, i32, i64, Option<&str>)> = Vec::new();
for o in offsets {
let o = o.as_ref();
if self.state.contains_topic_partition(o.topic, o.partition) {
offset_vec.push((o.topic, o.partition, o.offset, None));
} else {
return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
}
}
if offset_vec.is_empty() {
debug!("commit_offsets_kp: no offsets provided");
Ok(())
} else {
__commit_offsets_kp(
&offset_vec,
group,
correlation_id,
&config.client_id,
&mut self.state,
&mut self.conn_pool,
config,
)
}
}
pub fn commit_offset_kp(&mut self, group: &str, topic: &str, partition: i32, offset: i64) -> Result<()> {
self.commit_offsets_kp(group, &[CommitOffset::new(topic, partition, offset)])
}
pub fn fetch_group_offsets_kp<'a, J, I>(
&mut self,
group: &str,
partitions: I,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
where
J: AsRef<FetchGroupOffset<'a>>,
I: IntoIterator<Item = J>,
{
let correlation_id = self.state.next_correlation_id();
let config = &self.config;
let mut partition_vec: Vec<(&str, i32)> = Vec::new();
for p in partitions {
let p = p.as_ref();
if self.state.contains_topic_partition(p.topic, p.partition) {
partition_vec.push((p.topic, p.partition));
} else {
return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
}
}
__fetch_group_offsets_kp(
&partition_vec,
group,
correlation_id,
&config.client_id,
&mut self.state,
&mut self.conn_pool,
config,
)
}
pub fn fetch_group_topic_offset_kp(
&mut self,
group: &str,
topic: &str,
) -> Result<Vec<PartitionOffset>> {
let correlation_id = self.state.next_correlation_id();
let config = &self.config;
let mut partition_vec: Vec<(&str, i32)> = Vec::new();
match self.state.partitions_for(topic) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(tp) => {
for (id, _) in tp {
partition_vec.push((topic, id));
}
}
}
__fetch_group_offsets_kp(
&partition_vec,
group,
correlation_id,
&config.client_id,
&mut self.state,
&mut self.conn_pool,
config,
)
.map(|mut m| m.remove(topic).unwrap_or_default())
}
}
impl KafkaClientInternals for KafkaClient {
fn internal_produce_messages<'a, 'b, I, J>(
&mut self,
required_acks: i16,
ack_timeout: i32,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
self.internal_produce_messages_kp(required_acks, ack_timeout, messages)
}
}
fn __fetch_messages_kp(
conn_pool: &mut network::Connections,
correlation_id: i32,
client_id: &str,
max_wait_ms: i32,
min_bytes: i32,
broker_partitions: HashMap<&str, Vec<(&str, i32, i64, i32)>>,
) -> Result<Vec<crate::protocol::fetch::OwnedFetchResponse>> {
let now = Instant::now();
let mut res = Vec::with_capacity(broker_partitions.len());
for (host, partitions) in broker_partitions {
let conn = conn_pool.get_conn(host, now)
.map_err(|e| e.with_broker_context(host, "Fetch"))?;
let (header, request) = crate::protocol::fetch::build_fetch_request(
correlation_id,
client_id,
-1,
max_wait_ms,
min_bytes,
0x7fffffff, &partitions,
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_FETCH)
.map_err(|e| e.with_broker_context(host, "Fetch"))?;
let kp_resp = __kp_get_response::<kafka_protocol::messages::FetchResponse>(
conn,
crate::protocol::API_VERSION_FETCH,
).map_err(|e| e.with_broker_context(host, "Fetch"))?;
let owned = crate::protocol::fetch::convert_fetch_response(kp_resp, correlation_id)?;
res.push(owned);
}
Ok(res)
}
#[allow(clippy::similar_names)]
fn __produce_messages_kp(
conn_pool: &mut network::Connections,
correlation_id: i32,
client_id: &str,
required_acks: i16,
ack_timeout_ms: i32,
compression: Compression,
broker_msgs: HashMap<&str, Vec<(&str, i32, Option<&[u8]>, Option<&[u8]>)>>,
no_acks: bool,
) -> Result<Vec<ProduceConfirm>> {
let now = Instant::now();
if no_acks {
for (host, msgs) in broker_msgs {
let conn = conn_pool.get_conn(host, now)
.map_err(|e| e.with_broker_context(host, "Produce"))?;
let (header, request) = crate::protocol::produce::build_produce_request(
correlation_id,
client_id,
required_acks,
ack_timeout_ms,
compression,
&msgs,
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_PRODUCE)
.map_err(|e| e.with_broker_context(host, "Produce"))?;
}
Ok(vec![])
} else {
let mut res: Vec<ProduceConfirm> = vec![];
for (host, msgs) in broker_msgs {
let conn = conn_pool.get_conn(host, now)
.map_err(|e| e.with_broker_context(host, "Produce"))?;
let (header, request) = crate::protocol::produce::build_produce_request(
correlation_id,
client_id,
required_acks,
ack_timeout_ms,
compression,
&msgs,
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_PRODUCE)
.map_err(|e| e.with_broker_context(host, "Produce"))?;
let kp_resp = __kp_get_response::<kafka_protocol::messages::ProduceResponse>(
conn,
crate::protocol::API_VERSION_PRODUCE,
).map_err(|e| e.with_broker_context(host, "Produce"))?;
let our_resp = crate::protocol::produce::convert_produce_response(kp_resp, correlation_id);
for tpo in our_resp.get_response() {
res.push(tpo);
}
}
Ok(res)
}
}
fn __get_response_size(conn: &mut network::KafkaConnection) -> Result<i32> {
let mut buf = [0u8; 4];
conn.read_exact(&mut buf)?;
Ok(i32::from_be_bytes(buf))
}
fn __kp_send_request(
conn: &mut network::KafkaConnection,
header: &kafka_protocol::messages::RequestHeader,
body: &impl kafka_protocol::protocol::Encodable,
api_version: i16,
) -> Result<()> {
use bytes::BytesMut;
use kafka_protocol::protocol::Encodable;
let mut header_buf = BytesMut::new();
header.encode(&mut header_buf, api_version).map_err(|_| Error::CodecError)?;
let mut body_buf = BytesMut::new();
body.encode(&mut body_buf, api_version).map_err(|_| Error::CodecError)?;
let total_len = (header_buf.len() + body_buf.len()) as i32;
let mut out = BytesMut::with_capacity(4 + total_len as usize);
out.extend_from_slice(&total_len.to_be_bytes());
out.extend_from_slice(&header_buf);
out.extend_from_slice(&body_buf);
trace!("__kp_send_request: sending {} bytes", out.len());
conn.send(&out)?;
Ok(())
}
fn __kp_get_response<R: kafka_protocol::protocol::Decodable>(
conn: &mut network::KafkaConnection,
api_version: i16,
) -> Result<R> {
use bytes::Bytes;
use kafka_protocol::messages::ResponseHeader;
use kafka_protocol::protocol::Decodable;
let size = __get_response_size(conn)?;
let resp_bytes = conn.read_exact_alloc(size as u64)?;
let mut bytes = Bytes::from(resp_bytes);
let _resp_header = ResponseHeader::decode(&mut bytes, api_version).map_err(|_| Error::CodecError)?;
R::decode(&mut bytes, api_version).map_err(|_| Error::CodecError)
}
fn __retry_sleep(cfg: &ClientConfig) {
thread::sleep(cfg.retry_backoff_time);
}
fn __get_group_coordinator_kp<'a>(
group: &str,
state: &'a mut state::ClientState,
conn_pool: &mut network::Connections,
config: &ClientConfig,
now: Instant,
) -> Result<&'a str> {
if let Some(host) = state.group_coordinator(group) {
return Ok(unsafe { mem::transmute(host) });
}
let correlation_id = state.next_correlation_id();
let (header, request) =
crate::protocol::consumer::build_find_coordinator_request(correlation_id, &config.client_id, group);
let mut attempt = 1;
loop {
let conn = conn_pool.get_conn_any(now).expect("available connection");
debug!(
"get_group_coordinator_kp: asking for coordinator of '{}' on: {:?}",
group, conn
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_FIND_COORDINATOR)
.map_err(|e| e.with_broker_context("any", "FindCoordinator"))?;
let kp_resp = __kp_get_response::<kafka_protocol::messages::FindCoordinatorResponse>(
conn,
crate::protocol::API_VERSION_FIND_COORDINATOR,
).map_err(|e| e.with_broker_context("any", "FindCoordinator"))?;
let r = crate::protocol::consumer::convert_find_coordinator_response(kp_resp, correlation_id);
let retry_code = match r.error {
0 => {
let gc = protocol::consumer::GroupCoordinatorResponse {
header: protocol::HeaderResponse { correlation: correlation_id },
error: r.error,
broker_id: r.broker_id,
port: r.port,
host: r.host,
};
return Ok(state.set_group_coordinator(group, &gc));
}
e if KafkaCode::from_protocol(e) == Some(KafkaCode::GroupCoordinatorNotAvailable) => e,
e => {
if let Some(code) = KafkaCode::from_protocol(e) {
return Err(Error::Kafka(code));
}
return Err(Error::Kafka(KafkaCode::Unknown));
}
};
if attempt < config.retry_max_attempts {
debug!(
"get_group_coordinator_kp: will retry request (c: {}) due to: {:?}",
correlation_id, retry_code
);
attempt += 1;
__retry_sleep(config);
} else {
return Err(Error::Kafka(
KafkaCode::from_protocol(retry_code).unwrap_or(KafkaCode::Unknown),
));
}
}
}
fn __commit_offsets_kp(
offsets: &[(&str, i32, i64, Option<&str>)],
group: &str,
correlation_id: i32,
client_id: &str,
state: &mut state::ClientState,
conn_pool: &mut network::Connections,
config: &ClientConfig,
) -> Result<()> {
let mut attempt = 1;
loop {
let now = Instant::now();
let host = __get_group_coordinator_kp(group, state, conn_pool, config, now)?;
debug!("commit_offsets_kp: sending request to: {}", host);
let conn = conn_pool.get_conn(host, now)
.map_err(|e| e.with_broker_context(host, "OffsetCommit"))?;
let (header, request) = crate::protocol::consumer::build_offset_commit_request(
correlation_id,
client_id,
group,
-1, "", -1, offsets,
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_OFFSET_COMMIT)
.map_err(|e| e.with_broker_context(host, "OffsetCommit"))?;
let kp_resp = __kp_get_response::<kafka_protocol::messages::OffsetCommitResponse>(
conn,
crate::protocol::API_VERSION_OFFSET_COMMIT,
).map_err(|e| e.with_broker_context(host, "OffsetCommit"))?;
let our_resp = crate::protocol::consumer::convert_offset_commit_response(kp_resp, correlation_id);
let mut retry_code = None;
'rproc: for tp in &our_resp.topic_partitions {
for p in &tp.partitions {
match KafkaCode::from_protocol(p.error) {
None => {}
Some(e @ KafkaCode::GroupLoadInProgress) => {
retry_code = Some(e);
break 'rproc;
}
Some(e @ KafkaCode::NotCoordinatorForGroup) => {
debug!("commit_offsets_kp: resetting group coordinator for '{}'", group);
state.remove_group_coordinator(group);
retry_code = Some(e);
break 'rproc;
}
Some(code) => return Err(Error::Kafka(code)),
}
}
}
match retry_code {
Some(e) => {
if attempt < config.retry_max_attempts {
debug!(
"commit_offsets_kp: will retry request (c: {}) due to: {:?}",
correlation_id, e
);
attempt += 1;
__retry_sleep(config);
} else {
return Err(Error::Kafka(e));
}
}
None => return Ok(()),
}
}
}
fn __fetch_group_offsets_kp(
partitions: &[(&str, i32)],
group: &str,
correlation_id: i32,
client_id: &str,
state: &mut state::ClientState,
conn_pool: &mut network::Connections,
config: &ClientConfig,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
let mut attempt = 1;
loop {
let now = Instant::now();
let host = __get_group_coordinator_kp(group, state, conn_pool, config, now)?;
debug!("fetch_group_offsets_kp: sending request to: {}", host);
let conn = conn_pool.get_conn(host, now)
.map_err(|e| e.with_broker_context(host, "OffsetFetch"))?;
let (header, request) = crate::protocol::consumer::build_offset_fetch_request(
correlation_id,
client_id,
group,
partitions,
);
__kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_OFFSET_FETCH)
.map_err(|e| e.with_broker_context(host, "OffsetFetch"))?;
let kp_resp = __kp_get_response::<kafka_protocol::messages::OffsetFetchResponse>(
conn,
crate::protocol::API_VERSION_OFFSET_FETCH,
).map_err(|e| e.with_broker_context(host, "OffsetFetch"))?;
let our_resp = crate::protocol::consumer::convert_offset_fetch_response(kp_resp, correlation_id);
let mut retry_code = None;
let mut topic_map = HashMap::with_capacity(our_resp.topic_partitions.len());
'rproc: for tp in our_resp.topic_partitions {
let mut partition_offsets = Vec::with_capacity(tp.partitions.len());
for p in tp.partitions {
match KafkaCode::from_protocol(p.error) {
None => {
partition_offsets.push(PartitionOffset {
offset: p.offset,
partition: p.partition,
});
}
Some(e @ KafkaCode::GroupLoadInProgress) => {
retry_code = Some(e);
break 'rproc;
}
Some(e @ KafkaCode::NotCoordinatorForGroup) => {
debug!("fetch_group_offsets_kp: resetting group coordinator for '{}'", group);
state.remove_group_coordinator(group);
retry_code = Some(e);
break 'rproc;
}
Some(e) => return Err(Error::Kafka(e)),
}
}
topic_map.insert(tp.topic, partition_offsets);
}
match retry_code {
Some(e) => {
if attempt < config.retry_max_attempts {
debug!(
"fetch_group_offsets_kp: will retry request (c: {}) due to: {:?}",
correlation_id, e
);
attempt += 1;
__retry_sleep(config);
} else {
return Err(Error::Kafka(e));
}
}
None => return Ok(topic_map),
}
}
}