use std::io::Error as IoError;
use std::io::ErrorKind;
use tracing::{debug, trace};
use dataplane::ReplicaKey;
use fluvio_spu_schema::server::fetch_offset::FetchOffsetsRequest;
use fluvio_spu_schema::server::fetch_offset::FetchOffsetPartitionResponse;
use crate::FluvioError;
use crate::client::SerialFrame;
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum OffsetInner {
Absolute(i64),
FromBeginning(i64),
FromEnd(i64),
}
#[derive(Debug, Clone, PartialEq)]
pub struct Offset {
inner: OffsetInner,
}
impl Offset {
pub fn absolute(index: i64) -> Result<Offset, FluvioError> {
if index < 0 {
return Err(FluvioError::NegativeOffset(index));
}
Ok(Self {
inner: OffsetInner::Absolute(index),
})
}
pub fn beginning() -> Offset {
Self::from_beginning(0)
}
pub fn from_beginning(offset: u32) -> Offset {
Self {
inner: OffsetInner::FromBeginning(offset as i64),
}
}
pub fn end() -> Offset {
Offset::from_end(0)
}
pub fn from_end(offset: u32) -> Offset {
Self {
inner: OffsetInner::FromEnd(offset as i64),
}
}
pub(crate) async fn to_absolute<F, S: Into<String>>(
&self,
client: &mut F,
topic: S,
partition: i32,
) -> Result<i64, FluvioError>
where
F: SerialFrame,
{
let offset = match self.inner {
OffsetInner::Absolute(offset) => offset,
OffsetInner::FromBeginning(offset) => {
let replica = ReplicaKey::new(topic, partition);
let offsets = fetch_offsets(client, &replica).await?;
offsets.start_offset + offset
}
OffsetInner::FromEnd(offset) => {
let replica = ReplicaKey::new(topic, partition);
let offsets = fetch_offsets(client, &replica).await?;
offsets.last_stable_offset - offset
}
};
Ok(offset)
}
}
async fn fetch_offsets<F: SerialFrame>(
client: &mut F,
replica: &ReplicaKey,
) -> Result<FetchOffsetPartitionResponse, FluvioError> {
debug!("fetching offset for replica: {}", replica);
let response = client
.send_receive(FetchOffsetsRequest::new(
replica.topic.to_owned(),
replica.partition,
))
.await?;
trace!(
"receive fetch response replica: {}, {:#?}",
replica,
response
);
match response.find_partition(&replica) {
Some(partition_response) => {
debug!("replica: {}, fetch offset: {}", replica, partition_response);
Ok(partition_response)
}
None => Err(IoError::new(
ErrorKind::InvalidData,
format!("no replica offset for: {}", replica),
)
.into()),
}
}