use std::collections::HashMap;
use std::time::Duration;
use crabka_protocol::owned::share_acknowledge_request::{
AcknowledgePartition, AcknowledgeTopic, AcknowledgementBatch as AckAckBatch,
ShareAcknowledgeRequest,
};
use crabka_protocol::owned::share_fetch_request::{
AcknowledgementBatch as FetchAckBatch, FetchPartition, FetchTopic, ShareFetchRequest,
};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
use super::consumer::ShareConsumer;
use super::types::{ShareAckMode, ShareAckType, ShareConsumerRecord};
use crate::error::ConsumerError;
const MAX_BYTES: i32 = 50 * 1024 * 1024;
const PARTITION_MAX_BYTES: i32 = 1 << 20;
const MAX_RECORDS: i32 = 500;
impl ShareConsumer {
#[allow(clippy::too_many_lines)]
pub async fn poll(
&mut self,
timeout: Duration,
) -> Result<Vec<ShareConsumerRecord>, ConsumerError> {
let assignment = self.assignment.lock().await.clone();
if assignment.is_empty() {
tokio::time::sleep(timeout).await;
return Ok(Vec::new());
}
let acks = self.take_piggyback_acks();
let mut by_topic: HashMap<WireUuid, Vec<(i32, Vec<FetchAckBatch>)>> = HashMap::new();
for (tid, _name, partition) in &assignment {
let packs = acks.get(&(*tid, *partition)).cloned().unwrap_or_default();
by_topic.entry(*tid).or_default().push((*partition, packs));
}
let topics: Vec<FetchTopic> = by_topic
.into_iter()
.map(|(topic_id, parts)| FetchTopic {
topic_id,
partitions: parts
.into_iter()
.map(
|(partition_index, acknowledgement_batches)| FetchPartition {
partition_index,
partition_max_bytes: PARTITION_MAX_BYTES,
acknowledgement_batches,
..Default::default()
},
)
.collect(),
..Default::default()
})
.collect();
let timeout_ms = i32::try_from(timeout.as_millis()).unwrap_or(i32::MAX);
let resp = self
.client
.send(ShareFetchRequest {
group_id: Some(self.group_id.clone()),
member_id: Some(self.member_id.clone()),
share_session_epoch: self.share_session_epoch,
max_wait_ms: timeout_ms,
min_bytes: 1,
max_bytes: MAX_BYTES,
max_records: MAX_RECORDS,
batch_size: MAX_RECORDS,
share_acquire_mode: 0,
is_renew_ack: false,
topics,
forgotten_topics_data: vec![],
..Default::default()
})
.await?;
if resp.error_code != 0 {
return Err(ConsumerError::Server(resp.error_code));
}
self.share_session_epoch = self.share_session_epoch.wrapping_add(1);
let name_for: HashMap<WireUuid, String> = assignment
.iter()
.map(|(tid, name, _)| (*tid, name.clone()))
.collect();
let mut out: Vec<ShareConsumerRecord> = Vec::new();
let mut delivered: Vec<(WireUuid, i32, i64, i64)> = Vec::new();
for topic in &resp.responses {
let topic_name = name_for.get(&topic.topic_id).cloned().unwrap_or_default();
for part in &topic.partitions {
if part.acknowledge_error_code != 0 {
tracing::warn!(
topic = %topic_name,
partition = part.partition_index,
acknowledge_error_code = part.acknowledge_error_code,
"share fetch piggyback acknowledge error"
);
}
if part.error_code != 0 {
tracing::warn!(
topic = %topic_name,
partition = part.partition_index,
error_code = part.error_code,
"share fetch partition error"
);
continue;
}
for ar in &part.acquired_records {
delivered.push((
topic.topic_id,
part.partition_index,
ar.first_offset,
ar.last_offset,
));
}
let Some(payload) = &part.records else {
continue;
};
let Some(batches) = payload.as_v2() else {
continue;
};
for batch in batches {
if batch.attributes.is_control_batch() {
continue;
}
for r in &batch.records {
let offset = batch.base_offset + i64::from(r.offset_delta);
let delivery_count = part
.acquired_records
.iter()
.find(|ar| ar.first_offset <= offset && offset <= ar.last_offset)
.map_or(0, |ar| ar.delivery_count);
out.push(ShareConsumerRecord {
topic: topic_name.clone(),
partition: part.partition_index,
offset,
timestamp: batch.base_timestamp + r.timestamp_delta,
key: r.key.clone(),
value: r.value.clone(),
delivery_count,
});
}
}
}
}
self.prev_delivered = delivered;
Ok(out)
}
pub fn acknowledge(
&mut self,
record: &ShareConsumerRecord,
ack: ShareAckType,
) -> Result<(), ConsumerError> {
if self.ack_mode == ShareAckMode::Implicit {
return Err(ConsumerError::IllegalState(
"acknowledge() is not allowed in implicit ack mode; \
records are auto-accepted on the next poll/close"
.into(),
));
}
let topic_id = self.topic_id_for(&record.topic);
self.pending_acks.push((
topic_id,
record.partition,
record.offset,
record.offset,
ack.wire(),
));
Ok(())
}
pub async fn renew(&mut self, record: &ShareConsumerRecord) -> Result<(), ConsumerError> {
if self.ack_mode == ShareAckMode::Implicit {
return Err(ConsumerError::IllegalState(
"renew() is not allowed in implicit ack mode; \
records are auto-accepted on the next poll/close"
.into(),
));
}
let topic_id = self.topic_id_for(&record.topic);
let topics = vec![AcknowledgeTopic {
topic_id,
partitions: vec![AcknowledgePartition {
partition_index: record.partition,
acknowledgement_batches: vec![AckAckBatch {
first_offset: record.offset,
last_offset: record.offset,
acknowledge_types: vec![],
..Default::default()
}],
..Default::default()
}],
..Default::default()
}];
let resp = self
.client
.send(ShareAcknowledgeRequest {
group_id: Some(self.group_id.clone()),
member_id: Some(self.member_id.clone()),
share_session_epoch: self.share_session_epoch,
is_renew_ack: true,
topics,
..Default::default()
})
.await?;
if resp.error_code != 0 {
return Err(ConsumerError::Server(resp.error_code));
}
self.share_session_epoch = self.share_session_epoch.wrapping_add(1);
Ok(())
}
pub async fn commit(&mut self) -> Result<(), ConsumerError> {
self.flush_pending_acks().await
}
pub(crate) async fn flush_pending_acks(&mut self) -> Result<(), ConsumerError> {
if self.pending_acks.is_empty() {
return Ok(());
}
let drained = std::mem::take(&mut self.pending_acks);
let topics = build_ack_topics(drained);
let resp = self
.client
.send(ShareAcknowledgeRequest {
group_id: Some(self.group_id.clone()),
member_id: Some(self.member_id.clone()),
share_session_epoch: self.share_session_epoch,
is_renew_ack: false,
topics,
..Default::default()
})
.await?;
if resp.error_code != 0 {
return Err(ConsumerError::Server(resp.error_code));
}
self.share_session_epoch = self.share_session_epoch.wrapping_add(1);
Ok(())
}
fn take_piggyback_acks(&mut self) -> HashMap<(WireUuid, i32), Vec<FetchAckBatch>> {
let mut out: HashMap<(WireUuid, i32), Vec<FetchAckBatch>> = HashMap::new();
match self.ack_mode {
ShareAckMode::Implicit => {
for (tid, partition, first, last) in std::mem::take(&mut self.prev_delivered) {
let count = usize::try_from(last - first + 1).unwrap_or(0);
out.entry((tid, partition))
.or_default()
.push(FetchAckBatch {
first_offset: first,
last_offset: last,
acknowledge_types: vec![ShareAckType::Accept.wire(); count],
..Default::default()
});
}
}
ShareAckMode::Explicit => {
for (tid, partition, first, last, ack) in std::mem::take(&mut self.pending_acks) {
let count = usize::try_from(last - first + 1).unwrap_or(0);
out.entry((tid, partition))
.or_default()
.push(FetchAckBatch {
first_offset: first,
last_offset: last,
acknowledge_types: vec![ack; count],
..Default::default()
});
}
self.prev_delivered.clear();
}
}
out
}
fn topic_id_for(&self, name: &str) -> WireUuid {
if let Ok(assignment) = self.assignment.try_lock()
&& let Some((tid, _, _)) = assignment.iter().find(|(_, n, _)| n == name)
{
return *tid;
}
if let Ok(names) = self.topic_names.try_lock()
&& let Some((tid, _)) = names.iter().find(|(_, n)| n.as_str() == name)
{
return *tid;
}
WireUuid::default()
}
}
fn build_ack_topics(acks: Vec<(WireUuid, i32, i64, i64, i8)>) -> Vec<AcknowledgeTopic> {
let mut by_topic: HashMap<WireUuid, HashMap<i32, Vec<AckAckBatch>>> = HashMap::new();
for (tid, partition, first, last, ack) in acks {
let count = usize::try_from(last - first + 1).unwrap_or(0);
by_topic
.entry(tid)
.or_default()
.entry(partition)
.or_default()
.push(AckAckBatch {
first_offset: first,
last_offset: last,
acknowledge_types: vec![ack; count],
..Default::default()
});
}
by_topic
.into_iter()
.map(|(topic_id, parts)| AcknowledgeTopic {
topic_id,
partitions: parts
.into_iter()
.map(
|(partition_index, acknowledgement_batches)| AcknowledgePartition {
partition_index,
acknowledgement_batches,
..Default::default()
},
)
.collect(),
..Default::default()
})
.collect()
}