use crate::{
backoff::{Backoff, BackoffConfig, ErrorOrThrottle},
client::error::{Error, RequestContext, Result},
connection::{
BrokerCache, BrokerCacheGeneration, BrokerConnection, BrokerConnector, MessengerTransport,
MetadataLookupMode,
},
messenger::RequestError,
protocol::{
error::Error as ProtocolError,
messages::{
DeleteRecordsRequest, DeleteRecordsResponse, DeleteRequestPartition,
DeleteRequestTopic, DeleteResponsePartition, FetchRequest, FetchRequestPartition,
FetchRequestTopic, FetchResponse, FetchResponsePartition, IsolationLevel,
ListOffsetsRequest, ListOffsetsRequestPartition, ListOffsetsRequestTopic,
ListOffsetsResponse, ListOffsetsResponsePartition, NORMAL_CONSUMER, ProduceRequest,
ProduceRequestPartitionData, ProduceRequestTopicData, ProduceResponse,
},
primitives::*,
record::{Record as ProtocolRecord, *},
},
record::{Record, RecordAndOffset},
throttle::maybe_throttle,
validation::ExactlyOne,
};
use chrono::{DateTime, LocalResult, TimeZone, Utc};
use std::{
ops::{ControlFlow, Deref, Range},
sync::Arc,
};
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use super::{error::ServerErrorResponse, metadata_cache::MetadataCacheGeneration};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnknownTopicHandling {
Error,
Retry,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum Compression {
#[default]
NoCompression,
#[cfg(feature = "compression-gzip")]
Gzip,
#[cfg(feature = "compression-lz4")]
Lz4,
#[cfg(feature = "compression-snappy")]
Snappy,
#[cfg(feature = "compression-zstd")]
Zstd,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OffsetAt {
Earliest,
Latest,
Timestamp(DateTime<Utc>),
}
#[derive(Debug)]
struct CurrentBroker {
broker: Option<BrokerConnection>,
gen_broker: BrokerCacheGeneration,
gen_leader_from_arbitrary: Option<MetadataCacheGeneration>,
gen_leader_from_self: Option<MetadataCacheGeneration>,
}
pub struct PartitionClient {
topic: String,
partition: i32,
brokers: Arc<BrokerConnector>,
backoff_config: Arc<BackoffConfig>,
current_broker: Mutex<CurrentBroker>,
unknown_topic_handling: UnknownTopicHandling,
}
impl std::fmt::Debug for PartitionClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PartitionClient({}:{})", self.topic, self.partition)
}
}
impl PartitionClient {
pub(super) async fn new(
topic: String,
partition: i32,
brokers: Arc<BrokerConnector>,
unknown_topic_handling: UnknownTopicHandling,
backoff_config: Arc<BackoffConfig>,
) -> Result<Self> {
let p = Self {
topic,
partition,
brokers: Arc::clone(&brokers),
backoff_config,
current_broker: Mutex::new(CurrentBroker {
broker: None,
gen_broker: BrokerCacheGeneration::START,
gen_leader_from_arbitrary: None,
gen_leader_from_self: None,
}),
unknown_topic_handling,
};
let scope = &p;
maybe_retry(
&p.backoff_config,
p.unknown_topic_handling,
&*brokers,
"leader_detection",
|| async move {
scope
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
Ok(())
},
)
.await?;
Ok(p)
}
pub fn topic(&self) -> &str {
&self.topic
}
pub fn partition(&self) -> i32 {
self.partition
}
pub async fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> Result<Vec<i64>> {
if records.is_empty() {
return Ok(vec![]);
}
let n = records.len() as i64;
let request = &build_produce_request(self.partition, &self.topic, records, compression);
maybe_retry(
&self.backoff_config,
self.unknown_topic_handling,
self,
"produce",
|| async move {
let (broker, r#gen) = self
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
maybe_throttle(response.throttle_time_ms)?;
process_produce_response(self.partition, &self.topic, n, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
},
)
.await
}
pub async fn fetch_records(
&self,
offset: i64,
bytes: Range<i32>,
max_wait_ms: i32,
) -> Result<(Vec<RecordAndOffset>, i64)> {
let request = &build_fetch_request(offset, bytes, max_wait_ms, self.partition, &self.topic);
let partition = maybe_retry(
&self.backoff_config,
self.unknown_topic_handling,
self,
"fetch_records",
|| async move {
let (broker, r#gen) = self
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
maybe_throttle(response.throttle_time_ms)?;
process_fetch_response(self.partition, &self.topic, response, offset)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
},
)
.await?;
let records = extract_records(partition.records.0, offset)?;
Ok((records, partition.high_watermark.0))
}
pub async fn get_offset(&self, at: OffsetAt) -> Result<i64> {
let request = &build_list_offsets_request(self.partition, &self.topic, at);
let partition = maybe_retry(
&self.backoff_config,
self.unknown_topic_handling,
self,
"get_offset",
|| async move {
let (broker, r#gen) = self
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
maybe_throttle(response.throttle_time_ms)?;
process_list_offsets_response(self.partition, &self.topic, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
},
)
.await?;
extract_offset(partition)
}
pub async fn delete_records(&self, offset: i64, timeout_ms: i32) -> Result<()> {
let request =
&build_delete_records_request(offset, timeout_ms, &self.topic, self.partition);
maybe_retry(
&self.backoff_config,
self.unknown_topic_handling,
self,
"delete_records",
|| async move {
let (broker, r#gen) = self
.get()
.await
.map_err(|e| ErrorOrThrottle::Error((e, None)))?;
let response = broker
.request(&request)
.await
.map_err(|e| ErrorOrThrottle::Error((e.into(), Some(r#gen))))?;
maybe_throttle(Some(response.throttle_time_ms))?;
process_delete_records_response(&self.topic, self.partition, response)
.map_err(|e| ErrorOrThrottle::Error((e, Some(r#gen))))
},
)
.await?;
Ok(())
}
async fn get_leader(
&self,
metadata_mode: MetadataLookupMode,
) -> Result<(i32, Option<MetadataCacheGeneration>)> {
let (metadata, r#gen) = self
.brokers
.request_metadata(&metadata_mode, Some(vec![self.topic.clone()]))
.await?;
let topic = metadata
.topics
.exactly_one()
.map_err(Error::exactly_one_topic)?;
if topic.name.0 != self.topic {
return Err(Error::InvalidResponse(format!(
"Expected metadata for topic \"{}\" got \"{}\"",
self.topic, topic.name.0
)));
}
if let Some(e) = topic.error {
return Err(Error::ServerError {
protocol_error: e,
error_message: None,
request: RequestContext::Topic(self.topic.clone()),
response: None,
is_virtual: false,
});
}
let partition = topic
.partitions
.iter()
.find(|p| p.partition_index.0 == self.partition)
.ok_or_else(|| {
Error::InvalidResponse(format!(
"Could not find metadata for partition {} in topic \"{}\"",
self.partition, self.topic
))
})?;
if let Some(e) = partition.error {
return Err(Error::ServerError {
protocol_error: e,
error_message: None,
request: RequestContext::Partition(self.topic.clone(), self.partition),
response: None,
is_virtual: false,
});
}
if partition.leader_id.0 == -1 {
return Err(Error::ServerError {
protocol_error: ProtocolError::LeaderNotAvailable,
error_message: None,
request: RequestContext::Partition(self.topic.clone(), self.partition),
response: None,
is_virtual: true,
});
}
info!(
topic=%self.topic,
partition=%self.partition,
leader=partition.leader_id.0,
%metadata_mode,
"Detected leader",
);
Ok((partition.leader_id.0, r#gen))
}
}
impl BrokerCache for &PartitionClient {
type R = MessengerTransport;
type E = Error;
async fn get(&self) -> Result<(Arc<Self::R>, BrokerCacheGeneration)> {
let mut current_broker = self.current_broker.lock().await;
if let Some(broker) = ¤t_broker.broker {
return Ok((Arc::clone(broker), current_broker.gen_broker));
}
info!(
topic=%self.topic,
partition=%self.partition,
"Creating new partition-specific broker connection",
);
let (leader, gen_leader_from_arbitrary) =
self.get_leader(MetadataLookupMode::CachedArbitrary).await?;
let broker = match self.brokers.connect(leader).await {
Ok(Some(c)) => Ok(c),
Ok(None) => {
if let Some(r#gen) = gen_leader_from_arbitrary {
self.brokers.invalidate_metadata_cache(
"partition client: broker that is leader is unknown",
r#gen,
);
}
Err(Error::InvalidResponse(format!(
"Partition leader {} not found in metadata response",
leader
)))
}
Err(e) => {
if let Some(r#gen) = gen_leader_from_arbitrary {
self.brokers.invalidate_metadata_cache(
"partition client: error connecting to partition leader",
r#gen,
);
}
Err(e.into())
}
}?;
let (leader_self, gen_leader_from_self) = self
.get_leader(MetadataLookupMode::SpecificBroker(Arc::clone(&broker)))
.await?;
if leader != leader_self {
if let Some(r#gen) = gen_leader_from_self {
self.brokers.invalidate_metadata_cache(
"partition client: broker that should be leader does treat itself as a leader",
r#gen,
);
}
return Err(Error::ServerError {
protocol_error: ProtocolError::NotLeaderOrFollower,
error_message: None,
request: RequestContext::Partition(self.topic.clone(), self.partition),
response: Some(ServerErrorResponse::LeaderForward {
broker: leader,
new_leader: leader_self,
}),
is_virtual: true,
});
}
*current_broker = CurrentBroker {
broker: Some(Arc::clone(&broker)),
gen_broker: current_broker.gen_broker.bump(),
gen_leader_from_arbitrary,
gen_leader_from_self,
};
info!(
topic=%self.topic,
partition=%self.partition,
leader,
"Created new partition-specific broker connection",
);
Ok((broker, current_broker.gen_broker))
}
async fn invalidate(&self, reason: &'static str, r#gen: BrokerCacheGeneration) {
let mut current_broker = self.current_broker.lock().await;
if current_broker.gen_broker != r#gen {
debug!(
reason,
current_gen = current_broker.gen_broker.get(),
request_gen = r#gen.get(),
"stale invalidation request for arbitrary broker cache",
);
return;
}
info!(
topic = self.topic.deref(),
partition = self.partition,
reason,
"Invaliding cached leader",
);
if let Some(r#gen) = current_broker.gen_leader_from_arbitrary {
self.brokers.invalidate_metadata_cache(reason, r#gen);
}
if let Some(r#gen) = current_broker.gen_leader_from_self {
self.brokers.invalidate_metadata_cache(reason, r#gen);
}
current_broker.broker = None
}
}
async fn maybe_retry<B, R, F, T>(
backoff_config: &BackoffConfig,
unknown_topic_handling: UnknownTopicHandling,
broker_cache: B,
request_name: &str,
f: R,
) -> Result<T>
where
B: BrokerCache,
R: (Fn() -> F) + Send + Sync,
F: std::future::Future<
Output = Result<T, ErrorOrThrottle<(Error, Option<BrokerCacheGeneration>)>>,
> + Send,
{
let mut backoff = Backoff::new(backoff_config);
backoff
.retry_with_backoff(request_name, || async {
let (error, cache_gen) = match f().await {
Ok(v) => {
return ControlFlow::Break(Ok(v));
}
Err(ErrorOrThrottle::Throttle(throttle)) => {
return ControlFlow::Continue(ErrorOrThrottle::Throttle(throttle));
}
Err(ErrorOrThrottle::Error(e)) => e,
};
let retry = match error {
Error::Request(RequestError::Poisoned(_) | RequestError::IO(_))
| Error::Connection(_) => {
if let Some(cache_gen) = cache_gen {
broker_cache
.invalidate("partition client: connection broken", cache_gen)
.await;
}
true
}
Error::ServerError {
protocol_error:
ProtocolError::InvalidReplicationFactor
| ProtocolError::LeaderNotAvailable
| ProtocolError::OffsetNotAvailable,
..
} => true,
Error::ServerError {
protocol_error: ProtocolError::NotLeaderOrFollower,
..
} => {
if let Some(cache_gen) = cache_gen {
broker_cache
.invalidate(
"partition client: server error: not leader or follower",
cache_gen,
)
.await;
}
true
}
Error::ServerError {
protocol_error: ProtocolError::UnknownTopicOrPartition,
..
} => {
if let Some(cache_gen) = cache_gen {
broker_cache
.invalidate(
"partition client: server error: unknown topic or partition",
cache_gen,
)
.await;
}
unknown_topic_handling == UnknownTopicHandling::Retry
}
_ => false,
};
if retry {
ControlFlow::Continue(ErrorOrThrottle::Error(error))
} else {
error!(
e=%error,
request_name,
"request encountered fatal error",
);
ControlFlow::Break(Err(error))
}
})
.await
.map_err(Error::RetryFailed)?
}
fn build_produce_request(
partition: i32,
topic: &str,
records: Vec<Record>,
compression: Compression,
) -> ProduceRequest {
let n = records.len() as i32;
let first_timestamp = records.first().unwrap().timestamp;
let mut max_timestamp = first_timestamp;
let records = records
.into_iter()
.enumerate()
.map(|(offset_delta, record)| {
max_timestamp = max_timestamp.max(record.timestamp);
ProtocolRecord {
key: record.key,
value: record.value,
timestamp_delta: (record.timestamp - first_timestamp).num_milliseconds(),
offset_delta: offset_delta as i32,
headers: record
.headers
.into_iter()
.map(|(key, value)| RecordHeader { key, value })
.collect(),
}
})
.collect();
let record_batch = ProduceRequestPartitionData {
index: Int32(partition),
records: Records(vec![RecordBatch {
base_offset: 0,
partition_leader_epoch: 0,
last_offset_delta: n - 1,
is_transactional: false,
base_sequence: -1,
compression: match compression {
Compression::NoCompression => RecordBatchCompression::NoCompression,
#[cfg(feature = "compression-gzip")]
Compression::Gzip => RecordBatchCompression::Gzip,
#[cfg(feature = "compression-lz4")]
Compression::Lz4 => RecordBatchCompression::Lz4,
#[cfg(feature = "compression-snappy")]
Compression::Snappy => RecordBatchCompression::Snappy,
#[cfg(feature = "compression-zstd")]
Compression::Zstd => RecordBatchCompression::Zstd,
},
timestamp_type: RecordBatchTimestampType::CreateTime,
producer_id: -1,
producer_epoch: -1,
first_timestamp: first_timestamp.timestamp_millis(),
max_timestamp: max_timestamp.timestamp_millis(),
records: ControlBatchOrRecords::Records(records),
}]),
};
ProduceRequest {
transactional_id: crate::protocol::primitives::NullableString(None),
acks: Int16(-1),
timeout_ms: Int32(30_000),
topic_data: vec![ProduceRequestTopicData {
name: String_(topic.to_string()),
partition_data: vec![record_batch],
}],
}
}
fn process_produce_response(
partition: i32,
topic: &str,
num_records: i64,
response: ProduceResponse,
) -> Result<Vec<i64>> {
let response = response
.responses
.exactly_one()
.map_err(Error::exactly_one_topic)?;
if response.name.0 != topic {
return Err(Error::InvalidResponse(format!(
"Expected write for topic \"{}\" got \"{}\"",
topic, response.name.0,
)));
}
let response = response
.partition_responses
.exactly_one()
.map_err(Error::exactly_one_partition)?;
if response.index.0 != partition {
return Err(Error::InvalidResponse(format!(
"Expected partition {} for topic \"{}\" got {}",
partition, topic, response.index.0,
)));
}
match response.error {
Some(e) => Err(Error::ServerError {
protocol_error: e,
error_message: None,
request: RequestContext::Partition(topic.to_owned(), partition),
response: None,
is_virtual: false,
}),
None => Ok((0..num_records)
.map(|x| x + response.base_offset.0)
.collect()),
}
}
fn build_fetch_request(
offset: i64,
bytes: Range<i32>,
max_wait_ms: i32,
partition: i32,
topic: &str,
) -> FetchRequest {
FetchRequest {
replica_id: NORMAL_CONSUMER,
max_wait_ms: Int32(max_wait_ms),
min_bytes: Int32(bytes.start),
max_bytes: Some(Int32(bytes.end.saturating_sub(1))),
isolation_level: Some(IsolationLevel::ReadCommitted),
topics: vec![FetchRequestTopic {
topic: String_(topic.to_string()),
partitions: vec![FetchRequestPartition {
partition: Int32(partition),
fetch_offset: Int64(offset),
partition_max_bytes: Int32(bytes.end.saturating_sub(1)),
}],
}],
}
}
fn process_fetch_response(
partition: i32,
topic: &str,
response: FetchResponse,
request_offset: i64,
) -> Result<FetchResponsePartition> {
let response_topic = response
.responses
.exactly_one()
.map_err(Error::exactly_one_topic)?;
if response_topic.topic.0 != topic {
return Err(Error::InvalidResponse(format!(
"Expected data for topic '{}' but got data for topic '{}'",
topic, response_topic.topic.0
)));
}
let response_partition = response_topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;
if response_partition.partition_index.0 != partition {
return Err(Error::InvalidResponse(format!(
"Expected data for partition {} but got data for partition {}",
partition, response_partition.partition_index.0
)));
}
if let Some(err) = response_partition.error_code {
return Err(Error::ServerError {
protocol_error: err,
error_message: None,
request: RequestContext::Fetch {
topic_name: topic.to_owned(),
partition_id: partition,
offset: request_offset,
},
response: Some(ServerErrorResponse::PartitionFetchState {
high_watermark: response_partition.high_watermark.0,
last_stable_offset: response_partition.last_stable_offset.map(|x| x.0),
}),
is_virtual: false,
});
}
Ok(response_partition)
}
fn extract_records(
partition_records: Vec<RecordBatch>,
request_offset: i64,
) -> Result<Vec<RecordAndOffset>> {
let mut records = vec![];
for batch in partition_records {
match batch.records {
ControlBatchOrRecords::ControlBatch(_) => {
}
ControlBatchOrRecords::Records(protocol_records) => {
records.reserve(protocol_records.len());
for record in protocol_records {
let offset = batch.base_offset + record.offset_delta as i64;
if offset < request_offset {
continue;
}
let timestamp_millis =
match batch.first_timestamp.checked_add(record.timestamp_delta) {
Some(ts) => ts,
None => {
return Err(Error::InvalidResponse(format!(
"Timestamp overflow (first_timestamp={}, delta={}",
batch.first_timestamp, record.timestamp_delta
)));
}
};
let timestamp = match Utc.timestamp_millis_opt(timestamp_millis) {
LocalResult::None => {
return Err(Error::InvalidResponse(format!(
"Not a valid timestamp ({timestamp_millis})"
)));
}
LocalResult::Single(ts) => ts,
LocalResult::Ambiguous(a, b) => {
return Err(Error::InvalidResponse(format!(
"Ambiguous timestamp ({timestamp_millis}): {a} or {b}"
)));
}
};
records.push(RecordAndOffset {
record: Record {
key: record.key,
value: record.value,
headers: record
.headers
.into_iter()
.map(|header| (header.key, header.value))
.collect(),
timestamp,
},
offset,
})
}
}
}
}
Ok(records)
}
fn build_list_offsets_request(partition: i32, topic: &str, at: OffsetAt) -> ListOffsetsRequest {
let timestamp = match at {
OffsetAt::Earliest => -2,
OffsetAt::Latest => -1,
OffsetAt::Timestamp(ts) => ts.timestamp_millis(),
};
ListOffsetsRequest {
replica_id: NORMAL_CONSUMER,
isolation_level: Some(IsolationLevel::ReadCommitted),
topics: vec![ListOffsetsRequestTopic {
name: String_(topic.to_owned()),
partitions: vec![ListOffsetsRequestPartition {
partition_index: Int32(partition),
timestamp: Int64(timestamp),
max_num_offsets: Some(Int32(1)),
}],
}],
}
}
fn process_list_offsets_response(
partition: i32,
topic: &str,
response: ListOffsetsResponse,
) -> Result<ListOffsetsResponsePartition> {
let response_topic = response
.topics
.exactly_one()
.map_err(Error::exactly_one_topic)?;
if response_topic.name.0 != topic {
return Err(Error::InvalidResponse(format!(
"Expected data for topic '{}' but got data for topic '{}'",
topic, response_topic.name.0
)));
}
let response_partition = response_topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;
if response_partition.partition_index.0 != partition {
return Err(Error::InvalidResponse(format!(
"Expected data for partition {} but got data for partition {}",
partition, response_partition.partition_index.0
)));
}
match response_partition.error_code {
Some(err) => Err(Error::ServerError {
protocol_error: err,
error_message: None,
request: RequestContext::Partition(topic.to_owned(), partition),
response: None,
is_virtual: false,
}),
None => Ok(response_partition),
}
}
fn extract_offset(partition: ListOffsetsResponsePartition) -> Result<i64> {
match (
partition.old_style_offsets.as_ref(),
partition.offset.as_ref(),
) {
(Some(offsets), None) => match offsets.0.as_ref() {
Some(offsets) => match offsets.len() {
1 => Ok(offsets[0].0),
n => Err(Error::InvalidResponse(format!(
"Expected 1 offset to be returned but got {}",
n
))),
},
None => Err(Error::InvalidResponse(
"Got NULL as offset array".to_owned(),
)),
},
(None, Some(offset)) => Ok(offset.0),
_ => unreachable!(),
}
}
fn build_delete_records_request(
offset: i64,
timeout_ms: i32,
topic: &str,
partition: i32,
) -> DeleteRecordsRequest {
DeleteRecordsRequest {
topics: vec![DeleteRequestTopic {
name: String_(topic.to_string()),
partitions: vec![DeleteRequestPartition {
partition_index: Int32(partition),
offset: Int64(offset),
tagged_fields: None,
}],
tagged_fields: None,
}],
timeout_ms: Int32(timeout_ms),
tagged_fields: None,
}
}
fn process_delete_records_response(
topic: &str,
partition: i32,
response: DeleteRecordsResponse,
) -> Result<DeleteResponsePartition> {
let response_topic = response
.topics
.exactly_one()
.map_err(Error::exactly_one_topic)?;
if response_topic.name.0 != topic {
return Err(Error::InvalidResponse(format!(
"Expected data for topic '{}' but got data for topic '{}'",
topic, response_topic.name.0
)));
}
let response_partition = response_topic
.partitions
.exactly_one()
.map_err(Error::exactly_one_partition)?;
if response_partition.partition_index.0 != partition {
return Err(Error::InvalidResponse(format!(
"Expected data for partition {} but got data for partition {}",
partition, response_partition.partition_index.0
)));
}
match response_partition.error {
Some(err) => Err(Error::ServerError {
protocol_error: err,
error_message: None,
request: RequestContext::Partition(topic.to_owned(), partition),
response: None,
is_virtual: false,
}),
None => Ok(response_partition),
}
}