use std::{cmp::min, time::SystemTime};
use nisshi_sans_io::{
ApiKey, ErrorCode, FetchRequest, FetchResponse, IsolationLevel,
fetch_request::{FetchPartition, FetchTopic},
fetch_response::{
EpochEndOffset, FetchableTopicResponse, LeaderIdAndEpoch, PartitionData, SnapshotId,
},
metadata_response::MetadataResponseTopic,
record::deflated::{Batch, Frame},
};
use rama::{Context, Service};
use tokio::time::{Duration, Instant, sleep};
use tracing::{debug, error, instrument};
use crate::{Error, Result, Storage, Topition};
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct FetchService;
impl ApiKey for FetchService {
const KEY: i16 = FetchRequest::KEY;
}
impl FetchService {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self,ctx,min_bytes,isolation,fetch_partition), fields(partition = fetch_partition.partition))]
async fn fetch_partition<G>(
&self,
ctx: &Context<G>,
max_wait: Duration,
min_bytes: u32,
max_bytes: &mut u32,
isolation: IsolationLevel,
topic: &str,
fetch_partition: &FetchPartition,
) -> Result<PartitionData>
where
G: Storage,
{
let started_at = Instant::now();
let partition_index = fetch_partition.partition;
let tp = Topition::new(topic, partition_index);
let mut batches = Vec::new();
let mut offset = fetch_partition.fetch_offset;
loop {
if *max_bytes == 0 {
break;
}
debug!(offset);
let mut fetched = ctx
.state()
.fetch(
&tp,
offset,
min_bytes,
*max_bytes,
isolation,
max_wait.saturating_sub(started_at.elapsed()),
)
.await
.inspect(|r| debug!(?tp, ?offset, ?r))
.inspect_err(|error| error!(?tp, ?error))?;
*max_bytes =
u32::try_from(fetched.byte_size()).map(|bytes| max_bytes.saturating_sub(bytes))?;
debug!(?offset, ?fetched, max_bytes);
if fetched.is_empty() || fetched.first().is_some_and(|batch| batch.record_count == 0) {
break;
}
if let Some(latest) = fetched
.iter()
.map(|batch| batch.base_offset + batch.record_count as i64)
.max()
.inspect(|latest| debug!(latest))
{
offset = latest;
}
batches.append(&mut fetched);
}
let offset_stage = ctx
.state()
.offset_stage(&tp)
.await
.inspect_err(|error| error!(?error, ?tp))?;
Ok(PartitionData::default()
.partition_index(partition_index)
.error_code(ErrorCode::None.into())
.high_watermark(offset_stage.high_watermark())
.last_stable_offset(Some(offset_stage.last_stable()))
.log_start_offset(Some(offset_stage.log_start()))
.diverging_epoch(None)
.current_leader(None)
.snapshot_id(None)
.aborted_transactions(Some([].into()))
.preferred_read_replica(Some(-1))
.records(if batches.is_empty() {
None
} else {
Some(Frame { batches })
}))
.inspect(|r| debug!(?r, elapsed = ?started_at.elapsed()))
}
fn unknown_topic_response(&self, fetch: &FetchTopic) -> Result<FetchableTopicResponse> {
Ok(FetchableTopicResponse::default()
.topic(fetch.topic.clone())
.topic_id(Some([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))
.partitions(fetch.partitions.as_ref().map(|partitions| {
partitions
.iter()
.map(|partition| {
PartitionData::default()
.partition_index(partition.partition)
.error_code(ErrorCode::UnknownTopicOrPartition.into())
.high_watermark(0)
.last_stable_offset(Some(0))
.log_start_offset(Some(-1))
.diverging_epoch(Some(
EpochEndOffset::default().epoch(-1).end_offset(-1),
))
.current_leader(Some(
LeaderIdAndEpoch::default().leader_id(0).leader_epoch(0),
))
.snapshot_id(Some(SnapshotId::default().end_offset(-1).epoch(-1)))
.aborted_transactions(Some([].into()))
.preferred_read_replica(Some(-1))
.records(None)
})
.collect()
})))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, ctx, min_bytes, isolation, fetch))]
async fn fetch_topic<G>(
&self,
ctx: &Context<G>,
max_wait: Duration,
min_bytes: u32,
max_bytes: &mut u32,
isolation: IsolationLevel,
fetch: &FetchTopic,
is_first_non_empty: &mut bool,
) -> Result<FetchableTopicResponse>
where
G: Storage,
{
let started_at = Instant::now();
let metadata = ctx.state().metadata(Some(&[fetch.into()])).await?;
if let Some(MetadataResponseTopic {
topic_id,
name: Some(name),
..
}) = metadata.topics().first()
{
let mut partitions = Vec::new();
for fetch_partition in fetch.partitions.as_ref().unwrap_or(&Vec::new()) {
let partition_max_bytes = if *is_first_non_empty {
*max_bytes
} else {
min(fetch_partition.partition_max_bytes as u32, *max_bytes)
};
let mut partition_bytes = partition_max_bytes;
debug!(partition_bytes, is_first_non_empty);
let remaining = max_wait.saturating_sub(started_at.elapsed());
let partition = self
.fetch_partition(
ctx,
remaining,
min_bytes,
&mut partition_bytes,
isolation,
name,
fetch_partition,
)
.await?;
*is_first_non_empty = *is_first_non_empty
&& partition
.records
.as_ref()
.is_some_and(|records| records.batches.is_empty());
debug!(partition_bytes, is_first_non_empty);
*max_bytes =
max_bytes.saturating_sub(partition_max_bytes.saturating_sub(partition_bytes));
partitions.push(partition);
}
Ok(FetchableTopicResponse::default()
.topic(fetch.topic.to_owned())
.topic_id(topic_id.to_owned())
.partitions(Some(partitions)))
} else {
self.unknown_topic_response(fetch)
}
}
#[instrument(skip(self, ctx, isolation, topics))]
pub(crate) async fn fetch<G>(
&self,
ctx: &Context<G>,
max_wait: Duration,
min_bytes: u32,
max_bytes: &mut u32,
isolation: IsolationLevel,
topics: &[FetchTopic],
) -> Result<Vec<FetchableTopicResponse>>
where
G: Storage,
{
debug!(?isolation, ?topics);
if topics.is_empty() {
Ok(vec![])
} else {
let started_at = SystemTime::now();
let mut responses = vec![];
let mut iteration = 0;
let mut bytes = 0;
let mut is_first_non_empty = true;
while !max_wait.saturating_sub(started_at.elapsed()?).is_zero() && bytes <= min_bytes {
debug!(?bytes, remaining = ?max_wait.saturating_sub(started_at.elapsed()?));
responses.clear();
let fetch_started_at = SystemTime::now();
for fetch in topics.iter() {
let fetch_response = self
.fetch_topic(
ctx,
max_wait.saturating_sub(started_at.elapsed()?),
min_bytes,
max_bytes,
isolation,
fetch,
&mut is_first_non_empty,
)
.await?;
responses.push(fetch_response);
}
bytes += u32::try_from(responses.byte_size())?;
let remaining = max_wait.saturating_sub(started_at.elapsed()?);
debug!(?iteration, ?max_wait, ?remaining, ?bytes, ?min_bytes);
if bytes > min_bytes {
break;
}
{
let fetch_elapsed = fetch_started_at.elapsed()?;
if !responses.is_empty() && remaining < fetch_elapsed {
debug!(responses.len = responses.len(), ?remaining, ?fetch_elapsed);
break;
}
}
sleep(remaining / 2).await;
iteration += 1;
}
Ok(responses)
}
}
}
impl<G> Service<G, FetchRequest> for FetchService
where
G: Storage,
{
type Response = FetchResponse;
type Error = Error;
#[instrument(skip(ctx, req))]
async fn serve(
&self,
ctx: Context<G>,
req: FetchRequest,
) -> Result<Self::Response, Self::Error> {
let started_at = SystemTime::now();
let responses = Some(if let Some(topics) = req.topics {
let isolation_level = req
.isolation_level
.map_or(Ok(IsolationLevel::ReadUncommitted), |isolation| {
IsolationLevel::try_from(isolation)
})?;
let max_wait_ms = u64::try_from(req.max_wait_ms).map(Duration::from_millis)?;
let min_bytes = u32::try_from(req.min_bytes)?;
const DEFAULT_MAX_BYTES: u32 = 5 * 1024 * 1024;
let mut max_bytes = req.max_bytes.map_or(Ok(DEFAULT_MAX_BYTES), |max_bytes| {
u32::try_from(max_bytes).map(|max_bytes| max_bytes.min(DEFAULT_MAX_BYTES))
})?;
self.fetch(
&ctx,
max_wait_ms,
min_bytes,
&mut max_bytes,
isolation_level,
topics.as_ref(),
)
.await?
} else {
vec![]
});
Ok(FetchResponse::default()
.throttle_time_ms(Some(0))
.error_code(Some(ErrorCode::None.into()))
.session_id(Some(0))
.node_endpoints(Some([].into()))
.responses(responses))
.inspect(|r| debug!(?r, elapsed = ?started_at.elapsed().ok()))
}
}
trait ByteSize {
fn byte_size(&self) -> u64;
}
impl<T> ByteSize for Vec<T>
where
T: ByteSize,
{
fn byte_size(&self) -> u64 {
self.iter().map(|item| item.byte_size()).sum()
}
}
impl<T> ByteSize for Option<T>
where
T: ByteSize,
{
fn byte_size(&self) -> u64 {
self.as_ref().map_or(0, |some| some.byte_size())
}
}
impl ByteSize for Batch {
fn byte_size(&self) -> u64 {
self.record_data.len() as u64
}
}
impl ByteSize for Frame {
fn byte_size(&self) -> u64 {
self.batches.byte_size()
}
}
impl ByteSize for PartitionData {
fn byte_size(&self) -> u64 {
self.records.byte_size()
}
}
impl ByteSize for FetchableTopicResponse {
fn byte_size(&self) -> u64 {
self.partitions.byte_size()
}
}