use std::io::Error as IoError;
use std::io::ErrorKind;
use tracing::{debug, trace};
use fluvio_protocol::record::ReplicaKey;
use fluvio_spu_schema::server::fetch_offset::FetchOffsetsRequest;
use fluvio_spu_schema::server::fetch_offset::FetchOffsetPartitionResponse;
use crate::FluvioError;
use fluvio_socket::VersionedSerialSocket;
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum OffsetInner {
Absolute(i64),
FromBeginning(i64),
FromEnd(i64),
}
impl OffsetInner {
fn resolve(&self, offsets: &FetchOffsetPartitionResponse, consumer_offset: Option<i64>) -> i64 {
match self {
Self::Absolute(offset) => *offset,
Self::FromBeginning(offset) => {
let resolved = if let Some(consumer_offset) = consumer_offset {
consumer_offset + offset
} else {
offsets.start_offset + offset
};
resolved.clamp(offsets.start_offset, offsets.last_stable_offset)
}
Self::FromEnd(offset) => {
let resolved = if let Some(consumer_offset) = consumer_offset {
consumer_offset - offset
} else {
offsets.last_stable_offset - offset
};
resolved.clamp(offsets.start_offset, offsets.last_stable_offset)
}
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Offset {
inner: OffsetInner,
}
impl Offset {
pub fn absolute(index: i64) -> Result<Offset, FluvioError> {
if index < 0 {
return Err(FluvioError::NegativeOffset(index));
}
Ok(Self {
inner: OffsetInner::Absolute(index),
})
}
pub fn beginning() -> Offset {
Self::from_beginning(0)
}
pub fn from_beginning(offset: u32) -> Offset {
Self {
inner: OffsetInner::FromBeginning(offset as i64),
}
}
pub fn end() -> Offset {
Offset::from_end(0)
}
pub fn from_end(offset: u32) -> Offset {
Self {
inner: OffsetInner::FromEnd(offset as i64),
}
}
pub(crate) async fn resolve(
&self,
offsets: &FetchOffsetPartitionResponse,
consumer_offset: Option<i64>,
) -> Result<i64, FluvioError> {
let offset = self.inner.resolve(offsets, consumer_offset);
let offset = offset.max(0);
Ok(offset)
}
}
pub(crate) async fn fetch_offsets(
client: &mut VersionedSerialSocket,
replica: &ReplicaKey,
) -> Result<FetchOffsetPartitionResponse, FluvioError> {
debug!("fetching offset for replica: {}", replica);
let response = client
.send_receive(FetchOffsetsRequest::new(
replica.topic.to_owned(),
replica.partition,
))
.await?;
trace!(
"receive fetch response replica: {}, {:#?}",
replica, response
);
match response.find_partition(replica) {
Some(partition_response) => {
debug!("replica: {replica}, fetch offset: {partition_response}");
Ok(partition_response)
}
None => Err(IoError::new(
ErrorKind::InvalidData,
format!("no replica offset for: {replica}"),
)
.into()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use fluvio_spu_schema::server::fetch_offset::FetchOffsetPartitionResponse;
#[test]
fn test_offset_beginning_without_consumer_offset() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromBeginning(3);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 3);
}
#[test]
fn test_offset_beginning_without_consumer_offset_start_nonzero() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 5,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromBeginning(3);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 8);
}
#[test]
fn test_offset_beginning_without_consumer_offset_end_short() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromBeginning(15);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 10);
}
#[test]
fn test_offset_beginning_without_consumer_offset_end_short_nonzero() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 5,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromBeginning(15);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 10);
}
#[test]
fn test_offset_end_without_consumer_offset() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromEnd(3);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 7);
}
#[test]
fn test_offset_end_without_consumer_offset_start_nonzero() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 6,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromEnd(6);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 6);
}
#[test]
fn test_offset_end_without_consumer_offset_short() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::FromEnd(100);
let absolute = offset_inner.resolve(&offsets, None);
assert_eq!(absolute, 0);
}
#[test]
fn test_offset_absolute_with_consumer_offset() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 10,
};
let offset_inner = OffsetInner::Absolute(4);
let absolute = offset_inner.resolve(&offsets, Some(100));
assert_eq!(absolute, 4);
}
#[test]
fn test_offset_beginning_with_consumer_offset() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 15,
};
let offset_inner = OffsetInner::FromBeginning(3);
let absolute = offset_inner.resolve(&offsets, Some(10));
assert_eq!(absolute, 13);
}
#[test]
fn test_offset_beginning_with_consumer_offset_clamp() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 10,
last_stable_offset: 22,
};
let offset_inner = OffsetInner::FromBeginning(5);
let absolute = offset_inner.resolve(&offsets, Some(20));
assert_eq!(absolute, 22);
}
#[test]
fn test_offset_end_with_consumer_offset() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 15,
};
let offset_inner = OffsetInner::FromEnd(3);
let absolute = offset_inner.resolve(&offsets, Some(10));
assert_eq!(absolute, 7);
}
#[test]
fn test_offset_end_with_consumer_offset_clamp() {
let offsets = FetchOffsetPartitionResponse {
error_code: Default::default(),
partition_index: 0,
start_offset: 0,
last_stable_offset: 15,
};
let offset_inner = OffsetInner::FromEnd(10);
let absolute = offset_inner.resolve(&offsets, Some(5));
assert_eq!(absolute, 0);
}
}