use bytes::Bytes;
use crate::connection::Connection;
use crate::error::ClientError;
use crabka_protocol::owned::fetch_request::{FetchPartition, FetchRequest, FetchTopic};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
#[derive(Debug, Clone)]
pub struct FetchedRecord {
pub offset: i64,
pub key: Option<Bytes>,
pub value: Option<Bytes>,
}
pub async fn fetch_partition(
conn: &Connection,
topic: &str,
topic_id: WireUuid,
partition: i32,
fetch_offset: i64,
max_wait_ms: i32,
partition_max_bytes: i32,
) -> Result<Vec<FetchedRecord>, ClientError> {
fetch_partition_with_isolation(
conn,
topic,
topic_id,
partition,
fetch_offset,
max_wait_ms,
partition_max_bytes,
0,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn fetch_partition_with_isolation(
conn: &Connection,
topic: &str,
topic_id: WireUuid,
partition: i32,
fetch_offset: i64,
max_wait_ms: i32,
partition_max_bytes: i32,
isolation_level: i8,
) -> Result<Vec<FetchedRecord>, ClientError> {
let resp = conn
.send(FetchRequest {
max_wait_ms,
min_bytes: 1,
max_bytes: 50 * 1024 * 1024,
isolation_level,
topics: vec![FetchTopic {
topic: topic.to_string(),
topic_id,
partitions: vec![FetchPartition {
partition,
fetch_offset,
partition_max_bytes,
..Default::default()
}],
..Default::default()
}],
..Default::default()
})
.await?;
decode_fetch_response(&resp, partition)
}
fn decode_fetch_response(
resp: &crabka_protocol::owned::fetch_response::FetchResponse,
partition: i32,
) -> Result<Vec<FetchedRecord>, ClientError> {
let mut out = Vec::new();
for t in &resp.responses {
for p in &t.partitions {
if p.partition_index != partition {
continue;
}
if p.error_code != 0 {
return Err(ClientError::Server {
error_code: p.error_code,
});
}
let Some(payload) = &p.records else { continue };
let Some(batches) = payload.as_v2() else {
continue;
};
for batch in batches {
if batch.attributes.is_control_batch() {
continue;
}
for r in &batch.records {
out.push(FetchedRecord {
offset: batch.base_offset + i64::from(r.offset_delta),
key: r.key.clone(),
value: r.value.clone(),
});
}
}
}
}
out.sort_by_key(|r| r.offset);
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_protocol::owned::fetch_response::{
FetchResponse, FetchableTopicResponse, PartitionData,
};
use crabka_protocol::records::{Record, RecordBatch, RecordsPayload};
fn batch_with(base_offset: i64, values: &[&[u8]]) -> RecordBatch {
let records = values
.iter()
.enumerate()
.map(|(i, v)| Record {
offset_delta: i32::try_from(i).unwrap(),
value: Some(Bytes::copy_from_slice(v)),
..Default::default()
})
.collect();
RecordBatch {
base_offset,
last_offset_delta: i32::try_from(values.len().saturating_sub(1)).unwrap(),
records,
..Default::default()
}
}
#[test]
fn decode_yields_absolute_offsets_for_the_requested_partition() {
let resp = FetchResponse {
responses: vec![FetchableTopicResponse {
topic: "t".into(),
partitions: vec![
PartitionData {
partition_index: 0,
high_watermark: 7,
records: Some(RecordsPayload::from(vec![batch_with(5, &[b"a", b"b"])])),
..Default::default()
},
PartitionData {
partition_index: 1,
high_watermark: 1,
records: Some(RecordsPayload::from(vec![batch_with(0, &[b"z"])])),
..Default::default()
},
],
..Default::default()
}],
..Default::default()
};
let got = decode_fetch_response(&resp, 0).unwrap();
assert!(got.len() == 2);
assert!(got[0].offset == 5);
assert!(got[0].value.as_deref() == Some(b"a".as_ref()));
assert!(got[1].offset == 6);
assert!(got[1].value.as_deref() == Some(b"b".as_ref()));
}
#[test]
fn partition_error_code_surfaces_as_server_error() {
let resp = FetchResponse {
responses: vec![FetchableTopicResponse {
topic: "t".into(),
partitions: vec![PartitionData {
partition_index: 0,
error_code: 1,
high_watermark: 0,
records: None,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let err = decode_fetch_response(&resp, 0).unwrap_err();
assert!(matches!(err, ClientError::Server { error_code: 1 }));
}
}