use crate::connection::Connection;
use crate::error::ClientError;
use crabka_protocol::owned::offset_for_leader_epoch_request::{
OffsetForLeaderEpochRequest, OffsetForLeaderPartition, OffsetForLeaderTopic,
};
use crabka_protocol::owned::offset_for_leader_epoch_response::OffsetForLeaderEpochResponse;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EpochEndOffset {
pub partition: i32,
pub leader_epoch: i32,
pub end_offset: i64,
pub error_code: i16,
}
pub async fn offset_for_leader_epoch(
conn: &Connection,
topic: &str,
partition: i32,
current_leader_epoch: i32,
leader_epoch: i32,
) -> Result<EpochEndOffset, ClientError> {
let resp: OffsetForLeaderEpochResponse = conn
.send(OffsetForLeaderEpochRequest {
replica_id: -1,
topics: vec![OffsetForLeaderTopic {
topic: topic.to_string(),
partitions: vec![OffsetForLeaderPartition {
partition,
current_leader_epoch,
leader_epoch,
..Default::default()
}],
..Default::default()
}],
..Default::default()
})
.await?;
parse_single(&resp, topic, partition)
}
fn parse_single(
resp: &OffsetForLeaderEpochResponse,
topic: &str,
partition: i32,
) -> Result<EpochEndOffset, ClientError> {
resp.topics
.iter()
.find(|t| t.topic == topic)
.and_then(|t| t.partitions.iter().find(|p| p.partition == partition))
.map(|p| EpochEndOffset {
partition: p.partition,
leader_epoch: p.leader_epoch,
end_offset: p.end_offset,
error_code: p.error_code,
})
.ok_or(ClientError::Server { error_code: -1 })
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_protocol::owned::offset_for_leader_epoch_response::{
EpochEndOffset as WireEpochEndOffset, OffsetForLeaderEpochResponse,
OffsetForLeaderTopicResult,
};
#[test]
fn parse_single_extracts_partition_answer() {
let resp = OffsetForLeaderEpochResponse {
topics: vec![OffsetForLeaderTopicResult {
topic: "t".into(),
partitions: vec![WireEpochEndOffset {
partition: 0,
leader_epoch: 2,
end_offset: 42,
error_code: 0,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let got = parse_single(&resp, "t", 0).unwrap();
assert!(
got == EpochEndOffset {
partition: 0,
leader_epoch: 2,
end_offset: 42,
error_code: 0
}
);
}
#[test]
fn parse_single_missing_partition_is_error() {
let resp = OffsetForLeaderEpochResponse::default();
assert!(parse_single(&resp, "t", 0).is_err());
}
}