use std;
use std::collections::hash_map;
use std::collections::hash_map::HashMap;
use std::io::Cursor;
use std::iter::Iterator;
use std::thread;
use std::time::{Duration, Instant};
pub use crate::compression::Compression;
use crate::protocol::list_offset::ListOffsetVersion;
pub use crate::utils::PartitionOffset;
use crate::utils::TimestampedPartitionOffset;
#[cfg(feature = "producer_timestamp")]
pub use crate::protocol::produce::ProducerTimestamp;
#[cfg(not(feature = "producer_timestamp"))]
use crate::protocol::produce::ProducerTimestamp;
#[cfg(feature = "security")]
pub use self::tls::{SecurityConfig, TlsConnector, TlsConnectorBuilder};
use crate::codecs::{FromByte, ToByte};
use crate::error::{Error, KafkaCode, Result};
use crate::protocol::{self, ResponseParser};
use crate::client_internals::KafkaClientInternals;
pub mod metadata;
mod network;
mod state;
#[cfg(feature = "security")]
mod tls;
pub mod sasl;
pub use self::sasl::{SaslConfig, SaslPlainConfig};
pub mod fetch {
pub use crate::protocol::fetch::{Data, Message, Partition, Response, Topic};
}
const DEFAULT_CONNECTION_RW_TIMEOUT_SECS: u64 = 120;
fn default_conn_rw_timeout() -> Option<Duration> {
match DEFAULT_CONNECTION_RW_TIMEOUT_SECS {
0 => None,
n => Some(Duration::from_secs(n)),
}
}
pub const DEFAULT_COMPRESSION: Compression = Compression::NONE;
pub const DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS: u64 = 100;
pub const DEFAULT_FETCH_MIN_BYTES: i32 = 4096;
pub const DEFAULT_FETCH_MAX_BYTES_PER_PARTITION: i32 = 32 * 1024;
pub const DEFAULT_FETCH_CRC_VALIDATION: bool = true;
pub const DEFAULT_GROUP_OFFSET_STORAGE: Option<GroupOffsetStorage> = None;
pub const DEFAULT_RETRY_BACKOFF_TIME_MILLIS: u64 = 100;
pub const DEFAULT_RETRY_MAX_ATTEMPTS: u32 = 120_000 / DEFAULT_RETRY_BACKOFF_TIME_MILLIS as u32;
pub const DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS: u64 = 540_000;
pub(crate) const DEFAULT_PRODUCER_TIMESTAMP: Option<ProducerTimestamp> = None;
#[derive(Debug)]
pub struct KafkaClient {
config: ClientConfig,
conn_pool: network::Connections,
state: state::ClientState,
}
#[derive(Debug)]
struct ClientConfig {
client_id: String,
hosts: Vec<String>,
compression: Compression,
fetch_max_wait_time: i32,
fetch_min_bytes: i32,
fetch_max_bytes_per_partition: i32,
fetch_crc_validation: bool,
offset_storage: Option<GroupOffsetStorage>,
retry_backoff_time: Duration,
retry_max_attempts: u32,
#[allow(unused)]
producer_timestamp: Option<ProducerTimestamp>,
}
#[derive(Debug, Copy, Clone)]
pub enum FetchOffset {
Earliest,
Latest,
ByTime(i64),
}
impl FetchOffset {
fn to_kafka_value(self) -> i64 {
match self {
FetchOffset::Earliest => -2,
FetchOffset::Latest => -1,
FetchOffset::ByTime(n) => n,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum GroupOffsetStorage {
Zookeeper,
Kafka,
}
impl GroupOffsetStorage {
fn offset_fetch_version(self) -> protocol::OffsetFetchVersion {
match self {
GroupOffsetStorage::Zookeeper => protocol::OffsetFetchVersion::V0,
GroupOffsetStorage::Kafka => protocol::OffsetFetchVersion::V5,
}
}
fn offset_commit_version(self) -> protocol::OffsetCommitVersion {
match self {
GroupOffsetStorage::Zookeeper => protocol::OffsetCommitVersion::V0,
GroupOffsetStorage::Kafka => protocol::OffsetCommitVersion::V7,
}
}
}
#[derive(Debug)]
pub struct FetchGroupOffset<'a> {
pub topic: &'a str,
pub partition: i32,
}
impl<'a> FetchGroupOffset<'a> {
#[inline]
#[must_use]
pub fn new(topic: &'a str, partition: i32) -> Self {
FetchGroupOffset { topic, partition }
}
}
impl<'a> AsRef<FetchGroupOffset<'a>> for FetchGroupOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct CommitOffset<'a> {
pub offset: i64,
pub topic: &'a str,
pub partition: i32,
}
impl<'a> CommitOffset<'a> {
#[must_use]
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
CommitOffset {
offset,
topic,
partition,
}
}
}
impl<'a> AsRef<CommitOffset<'a>> for CommitOffset<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug, Copy, Clone)]
pub enum RequiredAcks {
None = 0,
One = 1,
All = -1,
}
#[derive(Debug)]
pub struct ProduceMessage<'a, 'b> {
pub key: Option<&'b [u8]>,
pub value: Option<&'b [u8]>,
pub topic: &'a str,
pub partition: i32,
}
impl<'a, 'b> AsRef<ProduceMessage<'a, 'b>> for ProduceMessage<'a, 'b> {
fn as_ref(&self) -> &Self {
self
}
}
impl<'a, 'b> ProduceMessage<'a, 'b> {
#[must_use]
pub fn new(
topic: &'a str,
partition: i32,
key: Option<&'b [u8]>,
value: Option<&'b [u8]>,
) -> Self {
ProduceMessage {
key,
value,
topic,
partition,
}
}
}
#[derive(Debug)]
pub struct FetchPartition<'a> {
pub topic: &'a str,
pub offset: i64,
pub partition: i32,
pub max_bytes: i32,
}
impl<'a> FetchPartition<'a> {
#[must_use]
pub fn new(topic: &'a str, partition: i32, offset: i64) -> Self {
FetchPartition {
topic,
partition,
offset,
max_bytes: -1,
}
}
#[must_use]
pub fn with_max_bytes(mut self, max_bytes: i32) -> Self {
self.max_bytes = max_bytes;
self
}
}
impl<'a> AsRef<FetchPartition<'a>> for FetchPartition<'a> {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Debug)]
pub struct ProduceConfirm {
pub topic: String,
pub partition_confirms: Vec<ProducePartitionConfirm>,
}
#[derive(Debug)]
pub struct ProducePartitionConfirm {
pub offset: std::result::Result<i64, KafkaCode>,
pub partition: i32,
}
impl KafkaClient {
#[must_use]
pub fn new(hosts: Vec<String>) -> KafkaClient {
KafkaClient {
config: ClientConfig {
client_id: String::new(),
hosts,
compression: DEFAULT_COMPRESSION,
fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
))
.expect("invalid default-fetch-max-time-millis"),
fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
producer_timestamp: DEFAULT_PRODUCER_TIMESTAMP,
},
conn_pool: network::Connections::new(
String::new(),
default_conn_rw_timeout(),
Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
),
state: state::ClientState::new(),
}
}
#[cfg(feature = "security")]
#[must_use]
pub fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient {
KafkaClient {
config: ClientConfig {
client_id: String::new(),
hosts,
compression: DEFAULT_COMPRESSION,
fetch_max_wait_time: protocol::to_millis_i32(Duration::from_millis(
DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS,
))
.expect("invalid default-fetch-max-time-millis"),
fetch_min_bytes: DEFAULT_FETCH_MIN_BYTES,
fetch_max_bytes_per_partition: DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
fetch_crc_validation: DEFAULT_FETCH_CRC_VALIDATION,
offset_storage: DEFAULT_GROUP_OFFSET_STORAGE,
retry_backoff_time: Duration::from_millis(DEFAULT_RETRY_BACKOFF_TIME_MILLIS),
retry_max_attempts: DEFAULT_RETRY_MAX_ATTEMPTS,
producer_timestamp: DEFAULT_PRODUCER_TIMESTAMP,
},
conn_pool: network::Connections::new_with_security(
String::new(),
default_conn_rw_timeout(),
Duration::from_millis(DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
Some(security),
),
state: state::ClientState::new(),
}
}
#[inline]
#[must_use]
pub fn hosts(&self) -> &[String] {
&self.config.hosts
}
pub fn set_client_id(&mut self, client_id: String) {
self.conn_pool.set_client_id(client_id.clone());
self.config.client_id = client_id;
}
pub fn set_sasl_config(&mut self, sasl: Option<SaslConfig>) {
self.conn_pool.set_sasl_config(sasl);
}
#[must_use]
pub fn client_id(&self) -> &str {
&self.config.client_id
}
#[inline]
pub fn set_compression(&mut self, compression: Compression) {
self.config.compression = compression;
}
#[inline]
#[must_use]
pub fn compression(&self) -> Compression {
self.config.compression
}
#[inline]
pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()> {
self.config.fetch_max_wait_time = protocol::to_millis_i32(max_wait_time)?;
Ok(())
}
#[inline]
#[must_use]
pub fn fetch_max_wait_time(&self) -> Duration {
Duration::from_millis(self.config.fetch_max_wait_time as u64)
}
#[inline]
pub fn set_fetch_min_bytes(&mut self, min_bytes: i32) {
self.config.fetch_min_bytes = min_bytes;
}
#[inline]
#[must_use]
pub fn fetch_min_bytes(&self) -> i32 {
self.config.fetch_min_bytes
}
#[inline]
pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32) {
self.config.fetch_max_bytes_per_partition = max_bytes;
}
#[inline]
#[must_use]
pub fn fetch_max_bytes_per_partition(&self) -> i32 {
self.config.fetch_max_bytes_per_partition
}
#[inline]
pub fn set_fetch_crc_validation(&mut self, validate_crc: bool) {
self.config.fetch_crc_validation = validate_crc;
}
#[inline]
#[must_use]
pub fn fetch_crc_validation(&self) -> bool {
self.config.fetch_crc_validation
}
#[inline]
pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>) {
self.config.offset_storage = storage;
}
#[must_use]
pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage> {
self.config.offset_storage
}
#[inline]
pub fn set_retry_backoff_time(&mut self, time: Duration) {
self.config.retry_backoff_time = time;
}
#[must_use]
pub fn retry_backoff_time(&self) -> Duration {
self.config.retry_backoff_time
}
#[inline]
pub fn set_retry_max_attempts(&mut self, attempts: u32) {
self.config.retry_max_attempts = attempts;
}
#[inline]
#[must_use]
pub fn retry_max_attempts(&self) -> u32 {
self.config.retry_max_attempts
}
#[inline]
pub fn set_connection_idle_timeout(&mut self, timeout: Duration) {
self.conn_pool.set_idle_timeout(timeout);
}
#[inline]
#[must_use]
pub fn connection_idle_timeout(&self) -> Duration {
self.conn_pool.idle_timeout()
}
#[cfg(feature = "producer_timestamp")]
#[inline]
pub fn set_producer_timestamp(&mut self, producer_timestamp: Option<ProducerTimestamp>) {
self.config.producer_timestamp = producer_timestamp;
}
#[cfg(feature = "producer_timestamp")]
#[inline]
#[must_use]
pub fn producer_timestamp(&self) -> Option<ProducerTimestamp> {
self.config.producer_timestamp
}
#[inline]
#[must_use]
pub fn topics(&self) -> metadata::Topics<'_> {
metadata::Topics::new(self)
}
#[inline]
pub fn load_metadata_all(&mut self) -> Result<()> {
self.reset_metadata();
self.load_metadata::<&str>(&[])
}
#[inline]
pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()> {
let resp = self.fetch_metadata(topics)?;
self.state.update_metadata(resp);
Ok(())
}
#[inline]
pub fn reset_metadata(&mut self) {
self.state.clear_metadata();
}
fn fetch_metadata<T: AsRef<str>>(
&mut self,
topics: &[T],
) -> Result<protocol::MetadataResponse> {
let correlation = self.state.next_correlation_id();
let now = Instant::now();
let mut last_error: Option<Error> = None;
for host in &self.config.hosts {
debug!("fetch_metadata: requesting metadata from {}", host);
match self.conn_pool.get_conn(host, now) {
Ok(conn) => {
let req =
protocol::MetadataRequest::new(correlation, &self.config.client_id, topics);
match __send_request(conn, req) {
Ok(_) => return __get_response::<protocol::MetadataResponse>(conn),
Err(e) => {
debug!(
"fetch_metadata: failed to request metadata from {}: {}",
host, e
);
last_error = Some(e);
}
}
}
Err(e) => {
debug!("fetch_metadata: failed to connect to {}: {}", host, e);
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(Error::NoHostReachable))
}
pub fn fetch_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
let res = self.list_offsets(topics, offset)?;
Ok(res
.into_iter()
.map(|(topic, offs)| {
(
topic,
offs.into_iter()
.map(|o| PartitionOffset {
partition: o.partition,
offset: o.offset,
})
.collect(),
)
})
.collect())
}
pub fn list_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<TimestampedPartitionOffset>>> {
let api_ver = ListOffsetVersion::V5;
let time = offset.to_kafka_value();
let n_topics = topics.len();
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut reqs: HashMap<&str, protocol::ListOffsetsRequest<'_>> =
HashMap::with_capacity(n_topics);
for topic in topics {
let topic = topic.as_ref();
if let Some(ps) = state.partitions_for(topic) {
for (id, host) in ps
.iter()
.filter_map(|(id, p)| p.broker(state).map(|b| (id, b.host())))
{
let entry = reqs.entry(host).or_insert_with(|| {
protocol::ListOffsetsRequest::new(correlation, api_ver, &config.client_id)
});
entry.add(topic, id, time);
}
}
}
let now = Instant::now();
let mut responses: HashMap<String, Vec<TimestampedPartitionOffset>> =
HashMap::with_capacity(n_topics);
for (host, req) in reqs {
let resp = __send_receive::<_, protocol::ListOffsetsResponse>(
&mut self.conn_pool,
host,
now,
req,
)?;
for tp in resp.topics {
let mut entry = responses.entry(tp.topic);
let mut new_resp_offsets = None;
let mut err = None;
{
let resp_offsets = match entry {
hash_map::Entry::Occupied(ref mut e) => e.get_mut(),
hash_map::Entry::Vacant(_) => {
new_resp_offsets = Some(Vec::new());
new_resp_offsets.as_mut().unwrap()
}
};
for p in tp.partitions {
let pto: TimestampedPartitionOffset = match p.to_offset() {
Ok(po) => po,
Err(code) => {
err = Some((p.partition, code));
break;
}
};
resp_offsets.push(pto);
}
}
if let Some((partition, code)) = err {
let topic = KafkaClient::get_key_from_entry(entry);
return Err(Error::TopicPartitionError {
topic_name: topic,
partition_id: partition,
error_code: code,
});
}
if let hash_map::Entry::Vacant(e) = entry {
e.insert(new_resp_offsets.unwrap());
}
}
}
Ok(responses)
}
fn get_key_from_entry<'a, K: 'a, V: 'a>(entry: hash_map::Entry<'a, K, V>) -> K {
match entry {
hash_map::Entry::Occupied(e) => e.remove_entry().0,
hash_map::Entry::Vacant(e) => e.into_key(),
}
}
pub fn fetch_topic_offsets<T: AsRef<str>>(
&mut self,
topic: T,
offset: FetchOffset,
) -> Result<Vec<PartitionOffset>> {
let topic = topic.as_ref();
let mut m = self.fetch_offsets(&[topic], offset)?;
let offs = m.remove(topic).unwrap_or_default();
if offs.is_empty() {
Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))
} else {
Ok(offs)
}
}
pub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<fetch::Response>>
where
J: AsRef<FetchPartition<'a>>,
I: IntoIterator<Item = J>,
{
let state = &mut self.state;
let config = &self.config;
let correlation = state.next_correlation_id();
let mut reqs: HashMap<&str, protocol::FetchRequest<'_, '_>> = HashMap::new();
for inp in input {
let inp = inp.as_ref();
if let Some(broker) = state.find_broker(inp.topic, inp.partition) {
reqs.entry(broker)
.or_insert_with(|| {
protocol::FetchRequest::new_v11(
correlation,
&config.client_id,
config.fetch_max_wait_time,
config.fetch_min_bytes,
i32::MAX,
"",
)
})
.add(
inp.topic,
inp.partition,
inp.offset,
if inp.max_bytes > 0 {
inp.max_bytes
} else {
config.fetch_max_bytes_per_partition
},
);
}
}
__fetch_messages(&mut self.conn_pool, config, reqs)
}
pub fn fetch_messages_for_partition(
&mut self,
req: &FetchPartition<'_>,
) -> Result<Vec<fetch::Response>> {
self.fetch_messages([req])
}
pub fn produce_messages<'a, 'b, I, J>(
&mut self,
acks: RequiredAcks,
ack_timeout: Duration,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
self.internal_produce_messages(acks as i16, protocol::to_millis_i32(ack_timeout)?, messages)
}
pub fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()>
where
J: AsRef<CommitOffset<'a>>,
I: IntoIterator<Item = J>,
{
match self.config.offset_storage {
Some(offset_storage) => {
let mut req = protocol::OffsetCommitRequest::new(
group,
offset_storage.offset_commit_version(),
self.state.next_correlation_id(),
&self.config.client_id,
);
for o in offsets {
let o = o.as_ref();
if self.state.contains_topic_partition(o.topic, o.partition) {
req.add(o.topic, o.partition, o.offset, "");
} else {
return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
}
}
if req.topic_partitions.is_empty() {
debug!("commit_offsets: no offsets provided");
Ok(())
} else {
__commit_offsets(&req, &mut self.state, &mut self.conn_pool, &self.config)
}
}
None => Err(Error::UnsetOffsetStorage),
}
}
pub fn commit_offset(
&mut self,
group: &str,
topic: &str,
partition: i32,
offset: i64,
) -> Result<()> {
self.commit_offsets(group, &[CommitOffset::new(topic, partition, offset)])
}
pub fn fetch_group_offsets<'a, J, I>(
&mut self,
group: &str,
partitions: I,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
where
J: AsRef<FetchGroupOffset<'a>>,
I: IntoIterator<Item = J>,
{
match self.config.offset_storage {
Some(offset_storage) => {
let mut req = protocol::OffsetFetchRequest::new(
group,
offset_storage.offset_fetch_version(),
self.state.next_correlation_id(),
&self.config.client_id,
);
for p in partitions {
let p = p.as_ref();
if self.state.contains_topic_partition(p.topic, p.partition) {
req.add(p.topic, p.partition);
} else {
return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
}
}
__fetch_group_offsets(&req, &mut self.state, &mut self.conn_pool, &self.config)
}
None => Err(Error::UnsetOffsetStorage),
}
}
pub fn fetch_group_topic_offset(
&mut self,
group: &str,
topic: &str,
) -> Result<Vec<PartitionOffset>> {
match self.config.offset_storage {
Some(offset_storage) => {
let mut req = protocol::OffsetFetchRequest::new(
group,
offset_storage.offset_fetch_version(),
self.state.next_correlation_id(),
&self.config.client_id,
);
match self.state.partitions_for(topic) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(tp) => {
for (id, _) in tp {
req.add(topic, id);
}
}
}
Ok(
__fetch_group_offsets(
&req,
&mut self.state,
&mut self.conn_pool,
&self.config,
)?
.remove(topic)
.unwrap_or_default(),
)
}
None => Err(Error::UnsetOffsetStorage),
}
}
}
impl KafkaClientInternals for KafkaClient {
fn internal_produce_messages<'a, 'b, I, J>(
&mut self,
required_acks: i16,
ack_timeout: i32,
messages: I,
) -> Result<Vec<ProduceConfirm>>
where
J: AsRef<ProduceMessage<'a, 'b>>,
I: IntoIterator<Item = J>,
{
let state = &mut self.state;
let correlation = state.next_correlation_id();
let config = &self.config;
let mut reqs: HashMap<&str, protocol::ProduceRequest<'_, '_>> = HashMap::new();
for msg in messages {
let msg = msg.as_ref();
match state.find_broker(msg.topic, msg.partition) {
None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
Some(broker) => reqs
.entry(broker)
.or_insert_with(|| {
protocol::ProduceRequest::new(
required_acks,
ack_timeout,
correlation,
&config.client_id,
config.compression,
#[cfg(feature = "producer_timestamp")]
config.producer_timestamp,
)
})
.add(msg.topic, msg.partition, msg.key, msg.value),
}
}
__produce_messages(&mut self.conn_pool, reqs, required_acks == 0)
}
}
fn __get_group_coordinator<'a>(
group: &str,
state: &'a mut state::ClientState,
conn_pool: &mut network::Connections,
config: &ClientConfig,
now: Instant,
) -> Result<&'a str> {
if state.group_coordinator(group).is_none() {
let correlation_id = state.next_correlation_id();
let req = protocol::GroupCoordinatorRequest::new(group, correlation_id, &config.client_id);
let mut attempt = 1;
loop {
let conn = conn_pool.get_conn_any(now).expect("available connection");
debug!(
"get_group_coordinator: asking for coordinator of '{}' on: {:?}",
group, conn
);
let r = __send_receive_conn::<_, protocol::GroupCoordinatorResponse>(conn, &req)?;
let retry_code = match r.into_result() {
Ok(r) => {
state.set_group_coordinator(group, &r);
break;
}
Err(Error::Kafka(e @ KafkaCode::GroupCoordinatorNotAvailable)) => e,
Err(e) => {
return Err(e);
}
};
if attempt < config.retry_max_attempts {
debug!(
"get_group_coordinator: will retry request (c: {}) due to: {:?}",
req.header.correlation_id, retry_code
);
attempt += 1;
__retry_sleep(config);
} else {
return Err(Error::Kafka(retry_code));
}
}
}
state
.group_coordinator(group)
.ok_or(Error::Kafka(KafkaCode::GroupCoordinatorNotAvailable))
}
fn __commit_offsets(
req: &protocol::OffsetCommitRequest<'_, '_>,
state: &mut state::ClientState,
conn_pool: &mut network::Connections,
config: &ClientConfig,
) -> Result<()> {
let mut attempt = 1;
loop {
let now = Instant::now();
let tps = {
let host = __get_group_coordinator(req.group, state, conn_pool, config, now)?;
debug!(
"__commit_offsets: sending offset commit request '{:?}' to: {}",
req, host
);
__send_receive::<_, protocol::OffsetCommitResponse>(conn_pool, host, now, &req)?
.topic_partitions
};
let mut retry_code = None;
'rproc: for tp in tps {
for p in tp.partitions {
match p.to_error() {
None => {}
Some(e @ KafkaCode::GroupLoadInProgress) => {
retry_code = Some(e);
break 'rproc;
}
Some(e @ KafkaCode::NotCoordinatorForGroup) => {
debug!(
"commit_offsets: resetting group coordinator for '{}'",
req.group
);
state.remove_group_coordinator(req.group);
retry_code = Some(e);
break 'rproc;
}
Some(code) => {
return Err(Error::Kafka(code));
}
}
}
}
match retry_code {
Some(e) => {
if attempt < config.retry_max_attempts {
debug!(
"commit_offsets: will retry request (c: {}) due to: {:?}",
req.header.correlation_id, e
);
attempt += 1;
__retry_sleep(config);
}
}
None => {
return Ok(());
}
}
}
}
fn __fetch_group_offsets(
req: &protocol::OffsetFetchRequest<'_, '_, '_>,
state: &mut state::ClientState,
conn_pool: &mut network::Connections,
config: &ClientConfig,
) -> Result<HashMap<String, Vec<PartitionOffset>>> {
let mut attempt = 1;
loop {
let now = Instant::now();
let r = {
let host = __get_group_coordinator(req.group, state, conn_pool, config, now)?;
debug!(
"fetch_group_offsets: sending request {:?} to: {}",
req, host
);
__send_receive::<_, protocol::OffsetFetchResponse>(conn_pool, host, now, &req)?
};
debug!("fetch_group_offsets: received response: {:#?}", r);
let mut retry_code = None;
if let Some(group_error) = r.group_error() {
match group_error {
Error::Kafka(e @ KafkaCode::GroupLoadInProgress) => {
retry_code = Some(e);
}
Error::Kafka(e @ KafkaCode::NotCoordinatorForGroup) => {
debug!(
"fetch_group_offsets: resetting group coordinator for '{}'",
req.group
);
state.remove_group_coordinator(req.group);
retry_code = Some(e);
}
e => {
return Err(e);
}
}
}
let mut topic_map = HashMap::with_capacity(r.topic_partitions.len());
'rproc: for tp in r.topic_partitions {
if retry_code.is_some() {
break;
}
let mut partition_offsets = Vec::with_capacity(tp.partitions.len());
for p in tp.partitions {
match p.get_offsets() {
Ok(o) => {
partition_offsets.push(o);
}
Err(Error::Kafka(e @ KafkaCode::GroupLoadInProgress)) => {
retry_code = Some(e);
break 'rproc;
}
Err(Error::Kafka(e @ KafkaCode::NotCoordinatorForGroup)) => {
debug!(
"fetch_group_offsets: resetting group coordinator for '{}'",
req.group
);
state.remove_group_coordinator(req.group);
retry_code = Some(e);
break 'rproc;
}
Err(e) => {
return Err(e);
}
}
}
topic_map.insert(tp.topic, partition_offsets);
}
match retry_code {
Some(e) => {
if attempt < config.retry_max_attempts {
debug!(
"fetch_group_offsets: will retry request (c: {}) due to: {:?}",
req.header.correlation_id, e
);
attempt += 1;
__retry_sleep(config);
} else {
return Err(Error::Kafka(e));
}
}
None => {
return Ok(topic_map);
}
}
}
}
fn __fetch_messages(
conn_pool: &mut network::Connections,
config: &ClientConfig,
reqs: HashMap<&str, protocol::FetchRequest<'_, '_>>,
) -> Result<Vec<fetch::Response>> {
let now = Instant::now();
let mut responses = Vec::with_capacity(reqs.len());
for (host, req) in reqs {
let p = protocol::fetch::ResponseParser {
validate_crc: config.fetch_crc_validation,
requests: Some(&req),
};
responses.push(__z_send_receive(conn_pool, host, now, &req, &p)?);
}
Ok(responses)
}
fn __produce_messages(
conn_pool: &mut network::Connections,
reqs: HashMap<&str, protocol::ProduceRequest<'_, '_>>,
no_acks: bool,
) -> Result<Vec<ProduceConfirm>> {
let now = Instant::now();
if no_acks {
for (host, req) in reqs {
__send_noack::<_, protocol::ProduceResponse>(conn_pool, host, now, req)?;
}
Ok(vec![])
} else {
let mut responses: Vec<ProduceConfirm> = vec![];
for (host, req) in reqs {
let resp = __send_receive::<_, protocol::ProduceResponse>(conn_pool, host, now, req)?;
for tpo in resp.get_response() {
responses.push(tpo);
}
}
Ok(responses)
}
}
fn __send_receive<T, V>(
conn_pool: &mut network::Connections,
host: &str,
now: Instant,
req: T,
) -> Result<V::R>
where
T: ToByte,
V: FromByte,
{
__send_receive_conn::<T, V>(conn_pool.get_conn(host, now)?, req)
}
fn __send_receive_conn<T, V>(conn: &mut network::KafkaConnection, req: T) -> Result<V::R>
where
T: ToByte,
V: FromByte,
{
__send_request(conn, req)?;
__get_response::<V>(conn)
}
fn __send_noack<T, V>(
conn_pool: &mut network::Connections,
host: &str,
now: Instant,
req: T,
) -> Result<usize>
where
T: ToByte,
V: FromByte,
{
let conn = conn_pool.get_conn(host, now)?;
__send_request(conn, req)
}
fn __send_request<T: ToByte>(conn: &mut network::KafkaConnection, request: T) -> Result<usize> {
let mut buffer = Vec::with_capacity(4);
buffer.extend_from_slice(&[0, 0, 0, 0]);
request.encode(&mut buffer)?;
let size = buffer.len() as i32 - 4;
size.encode(&mut &mut buffer[..])?;
trace!("__send_request: Sending bytes: {:?}", &buffer);
conn.send(&buffer)
}
fn __get_response<T: FromByte>(conn: &mut network::KafkaConnection) -> Result<T::R> {
let size = __get_response_size(conn)?;
let resp = conn.read_exact_alloc(size as u64)?;
trace!("__get_response: received bytes: {:?}", &resp);
T::decode_new(&mut Cursor::new(resp))
}
fn __z_send_receive<R, P>(
conn_pool: &mut network::Connections,
host: &str,
now: Instant,
req: R,
parser: &P,
) -> Result<P::T>
where
R: ToByte,
P: ResponseParser,
{
let conn = conn_pool.get_conn(host, now)?;
__send_request(conn, req)?;
__z_get_response(conn, parser)
}
fn __z_get_response<P>(conn: &mut network::KafkaConnection, parser: &P) -> Result<P::T>
where
P: ResponseParser,
{
let size = __get_response_size(conn)?;
let resp = conn.read_exact_alloc(size as u64)?;
parser.parse(resp)
}
fn __get_response_size(conn: &mut network::KafkaConnection) -> Result<i32> {
let mut buf = [0u8; 4];
conn.read_exact(&mut buf)?;
i32::decode_new(&mut Cursor::new(&buf))
}
fn __retry_sleep(cfg: &ClientConfig) {
thread::sleep(cfg.retry_backoff_time);
}