use std::collections::{BTreeMap, HashMap};
use kafka_protocol::messages::share_acknowledge_request::AcknowledgementBatch as ShareAcknowledgeBatch;
use kafka_protocol::messages::share_fetch_request::AcknowledgementBatch as ShareFetchAcknowledgementBatch;
use super::types::{AcknowledgeType, ShareAcknowledgementCommit, TopicIdPartitionKey};
const MAX_RECORDS_WITH_SAME_ACKNOWLEDGE_TYPE: usize = 10;
#[derive(Debug, Clone, Default)]
pub(super) struct Acknowledgements {
pub(super) offsets: BTreeMap<i64, Option<AcknowledgeType>>,
}
impl Acknowledgements {
pub(super) fn add(&mut self, offset: i64, acknowledge_type: AcknowledgeType) {
self.offsets.insert(offset, Some(acknowledge_type));
}
pub(super) fn is_empty(&self) -> bool {
self.offsets.is_empty()
}
pub(super) fn extend(&mut self, acknowledgements: Acknowledgements) {
self.offsets.extend(acknowledgements.offsets);
}
pub(super) fn into_share_fetch_batches(self) -> Vec<ShareFetchAcknowledgementBatch> {
acknowledgement_batches(self.offsets)
.into_iter()
.map(|batch| {
ShareFetchAcknowledgementBatch::default()
.with_first_offset(batch.first_offset)
.with_last_offset(batch.last_offset)
.with_acknowledge_types(batch.acknowledge_types)
})
.collect()
}
pub(super) fn into_share_acknowledge_batches(self) -> Vec<ShareAcknowledgeBatch> {
acknowledgement_batches(self.offsets)
.into_iter()
.map(|batch| {
ShareAcknowledgeBatch::default()
.with_first_offset(batch.first_offset)
.with_last_offset(batch.last_offset)
.with_acknowledge_types(batch.acknowledge_types)
})
.collect()
}
}
struct ProtocolAcknowledgementBatch {
first_offset: i64,
last_offset: i64,
acknowledge_types: Vec<i8>,
}
pub(super) fn share_acknowledgement_commits(
acknowledgements: &HashMap<TopicIdPartitionKey, Acknowledgements>,
error: Option<String>,
) -> Vec<ShareAcknowledgementCommit> {
acknowledgements
.iter()
.filter_map(|(key, acks)| share_acknowledgement_commit(key, acks, error.clone()))
.collect()
}
pub(super) fn share_acknowledgement_commit(
key: &TopicIdPartitionKey,
acks: &Acknowledgements,
error: Option<String>,
) -> Option<ShareAcknowledgementCommit> {
let offsets = acks.offsets.keys().copied().collect::<Vec<_>>();
(!offsets.is_empty()).then(|| ShareAcknowledgementCommit {
topic: key.topic.clone(),
topic_id: key.topic_id,
partition: key.partition,
offsets,
error,
})
}
fn acknowledgement_batches(
offsets: BTreeMap<i64, Option<AcknowledgeType>>,
) -> Vec<ProtocolAcknowledgementBatch> {
let mut batches = Vec::new();
let mut current: Option<ProtocolAcknowledgementBatch> = None;
for (offset, acknowledge_type) in offsets {
let protocol_value = acknowledge_type
.map(AcknowledgeType::protocol_value)
.unwrap_or(0);
match current.as_mut() {
Some(batch) if batch.last_offset + 1 == offset => {
batch.last_offset = offset;
batch.acknowledge_types.push(protocol_value);
}
Some(_) => {
batches.extend(optimise_acknowledgement_batch(
current.take().expect("current batch exists"),
));
current = Some(ProtocolAcknowledgementBatch {
first_offset: offset,
last_offset: offset,
acknowledge_types: vec![protocol_value],
});
}
None => {
current = Some(ProtocolAcknowledgementBatch {
first_offset: offset,
last_offset: offset,
acknowledge_types: vec![protocol_value],
});
}
}
}
if let Some(batch) = current {
batches.extend(optimise_acknowledgement_batch(batch));
}
batches
}
fn optimise_acknowledgement_batch(
batch: ProtocolAcknowledgementBatch,
) -> Vec<ProtocolAcknowledgementBatch> {
let mut batches = Vec::new();
if batch.acknowledge_types.is_empty() {
return batches;
}
let mut current_offset = batch.first_offset;
let mut current_start_index = 0usize;
let mut records_with_same_type = 1usize;
let mut i = 1usize;
while i < batch.acknowledge_types.len() {
let acknowledge_type = batch.acknowledge_types[i];
let previous_type = batch.acknowledge_types[i - 1];
if acknowledge_type == previous_type
&& records_with_same_type >= MAX_RECORDS_WITH_SAME_ACKNOWLEDGE_TYPE
{
while i < batch.acknowledge_types.len()
&& batch.acknowledge_types[i] == batch.acknowledge_types[i - 1]
{
i += 1;
records_with_same_type += 1;
}
let prefix_last = current_offset + i as i64
- records_with_same_type as i64
- current_start_index as i64
- 1;
if prefix_last >= current_offset {
batches.push(ProtocolAcknowledgementBatch {
first_offset: current_offset,
last_offset: prefix_last,
acknowledge_types: batch.acknowledge_types
[current_start_index..i - records_with_same_type]
.to_vec(),
});
}
batches.push(ProtocolAcknowledgementBatch {
first_offset: current_offset + i as i64
- records_with_same_type as i64
- current_start_index as i64,
last_offset: current_offset + i as i64 - current_start_index as i64 - 1,
acknowledge_types: vec![acknowledge_type],
});
records_with_same_type = 1;
current_offset += i as i64 - current_start_index as i64;
current_start_index = i;
} else if acknowledge_type == previous_type {
records_with_same_type += 1;
i += 1;
} else {
records_with_same_type = 1;
i += 1;
}
}
if current_start_index < batch.acknowledge_types.len() {
batches.push(ProtocolAcknowledgementBatch {
first_offset: current_offset,
last_offset: current_offset + batch.acknowledge_types.len() as i64
- current_start_index as i64
- 1,
acknowledge_types: batch.acknowledge_types[current_start_index..].to_vec(),
});
}
batches
.into_iter()
.map(optimise_single_acknowledge_type)
.collect()
}
fn optimise_single_acknowledge_type(
batch: ProtocolAcknowledgementBatch,
) -> ProtocolAcknowledgementBatch {
if batch.acknowledge_types.len() > 1
&& batch
.acknowledge_types
.iter()
.all(|value| *value == batch.acknowledge_types[0])
{
ProtocolAcknowledgementBatch {
first_offset: batch.first_offset,
last_offset: batch.last_offset,
acknowledge_types: vec![batch.acknowledge_types[0]],
}
} else {
batch
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
#[test]
fn acknowledgement_batches_group_contiguous_offsets() {
let mut acks = BTreeMap::new();
acks.insert(4, Some(AcknowledgeType::Accept));
acks.insert(5, Some(AcknowledgeType::Release));
acks.insert(9, Some(AcknowledgeType::Reject));
let batches = acknowledgement_batches(acks);
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].first_offset, 4);
assert_eq!(batches[0].last_offset, 5);
assert_eq!(batches[0].acknowledge_types, vec![1, 2]);
assert_eq!(batches[1].first_offset, 9);
assert_eq!(batches[1].last_offset, 9);
assert_eq!(batches[1].acknowledge_types, vec![3]);
}
#[test]
fn acknowledgement_batches_optimise_single_type_ranges() {
let mut acks = BTreeMap::new();
for offset in 10..=15 {
acks.insert(offset, Some(AcknowledgeType::Accept));
}
let batches = acknowledgement_batches(acks);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].first_offset, 10);
assert_eq!(batches[0].last_offset, 15);
assert_eq!(batches[0].acknowledge_types, vec![1]);
}
#[test]
fn acknowledgement_batches_split_long_same_type_runs_inside_mixed_batch() {
let mut acks = BTreeMap::new();
acks.insert(0, Some(AcknowledgeType::Release));
for offset in 1..=12 {
acks.insert(offset, Some(AcknowledgeType::Accept));
}
acks.insert(13, Some(AcknowledgeType::Reject));
let batches = acknowledgement_batches(acks);
assert_eq!(batches.len(), 3);
assert_eq!(batches[0].first_offset, 0);
assert_eq!(batches[0].last_offset, 0);
assert_eq!(batches[0].acknowledge_types, vec![2]);
assert_eq!(batches[1].first_offset, 1);
assert_eq!(batches[1].last_offset, 12);
assert_eq!(batches[1].acknowledge_types, vec![1]);
assert_eq!(batches[2].first_offset, 13);
assert_eq!(batches[2].last_offset, 13);
assert_eq!(batches[2].acknowledge_types, vec![3]);
}
#[test]
fn acknowledgement_commits_copy_offsets_by_partition() {
let topic_id = Uuid::new_v4();
let key = TopicIdPartitionKey {
topic_id,
topic: "orders".to_owned(),
partition: 2,
};
let mut acks = Acknowledgements::default();
acks.add(4, AcknowledgeType::Accept);
acks.add(7, AcknowledgeType::Release);
let commits =
share_acknowledgement_commits(&HashMap::from([(key, acks)]), Some("failed".to_owned()));
assert_eq!(commits.len(), 1);
assert_eq!(commits[0].topic, "orders");
assert_eq!(commits[0].topic_id, topic_id);
assert_eq!(commits[0].partition, 2);
assert_eq!(commits[0].offsets, vec![4, 7]);
assert_eq!(commits[0].error.as_deref(), Some("failed"));
}
}