mod common;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use anyhow::{Result, bail};
use bytes::Bytes;
use kafkit_client::{
AcknowledgeType, AdminConfig, ConsumerConfig, KafkaAdmin, KafkaProducer, KafkaShareConsumer,
NewTopic, ProduceRecord, ProducerConfig, RecordHeader, ShareAcknowledgementCommit,
ShareAcquireMode, ShareConsumerOptions, ShareRecord, ShareRecords, TopicPartition,
};
use crate::common::{KafkaHarness, unique_group, unique_topic};
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_accepts_and_commits_records() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-accept-commit");
let group = unique_group("share-accept-commit");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let completed = Arc::new(Mutex::new(Vec::<ShareAcknowledgementCommit>::new()));
let completed_for_callback = Arc::clone(&completed);
let mut consumer = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group),
ShareConsumerOptions::default().with_acknowledgement_commit_callback(move |commits| {
completed_for_callback
.lock()
.expect("callback mutex")
.extend(commits);
}),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut consumer, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"first"),
))
.await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"second"),
))
.await?;
let records = poll_share_until(
&mut consumer,
|records| records.len() >= 2,
Duration::from_secs(60),
)
.await?;
for record in records.iter() {
consumer.acknowledge(record, AcknowledgeType::Accept);
}
consumer.commit_sync().await?;
let committed_offsets = completed
.lock()
.expect("callback mutex")
.iter()
.flat_map(|commit| commit.offsets.iter().copied())
.collect::<Vec<_>>();
assert!(committed_offsets.contains(&0));
assert!(committed_offsets.contains(&1));
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_group_two_consumers_share_one_partition_and_both_receive_records() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-two-consumers-one-partition");
let group = unique_group("share-two-consumers-one-partition");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let mut consumer_a = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone()),
ShareConsumerOptions::default().with_max_poll_records(1),
)
.await?;
let mut consumer_b = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group),
ShareConsumerOptions::default().with_max_poll_records(1),
)
.await?;
consumer_a.subscribe(vec![topic.clone()]).await?;
consumer_b.subscribe(vec![topic.clone()]).await?;
wait_for_share_partition_assignment(&mut consumer_a, &topic, 0, Duration::from_secs(60))
.await?;
wait_for_share_partition_assignment(&mut consumer_b, &topic, 0, Duration::from_secs(60))
.await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
for index in 0..20 {
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from(format!("shared-{index:02}")),
))
.await?;
}
let (consumer_a_values, consumer_b_values) = poll_two_share_consumers_until_both_receive(
&mut consumer_a,
&mut consumer_b,
Duration::from_secs(60),
)
.await?;
assert!(
!consumer_a_values.is_empty(),
"first share consumer should receive at least one record"
);
assert!(
!consumer_b_values.is_empty(),
"second share consumer should receive at least one record"
);
consumer_a.shutdown().await?;
consumer_b.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_release_makes_record_available_again() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-release-redelivery");
let group = unique_group("share-release-redelivery");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let mut consumer =
KafkaShareConsumer::connect(ConsumerConfig::new(kafka.bootstrap_server(), group)).await?;
consumer.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut consumer, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"retry-me"),
))
.await?;
let first = poll_share_until(
&mut consumer,
|records| {
records
.iter()
.any(|record| record.record.value.as_deref() == Some(&b"retry-me"[..]))
},
Duration::from_secs(60),
)
.await?;
let record = first
.iter()
.find(|record| record.record.value.as_deref() == Some(&b"retry-me"[..]))
.expect("retry-me should be fetched first");
consumer.acknowledge(record, AcknowledgeType::Release);
consumer.commit_sync().await?;
let redelivered = poll_share_until(
&mut consumer,
|records| {
records.iter().any(|record| {
record.record.value.as_deref() == Some(&b"retry-me"[..])
&& record.delivery_count >= 2
})
},
Duration::from_secs(60),
)
.await?;
let record = redelivered
.iter()
.find(|record| record.record.value.as_deref() == Some(&b"retry-me"[..]))
.expect("retry-me should be redelivered");
assert!(record.delivery_count >= 2);
consumer.acknowledge(record, AcknowledgeType::Accept);
consumer.commit_sync().await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_reacquires_unacked_record_after_original_member_leaves() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-reacquire-after-leave");
let group = unique_group("share-reacquire-after-leave");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let mut original =
KafkaShareConsumer::connect(ConsumerConfig::new(kafka.bootstrap_server(), group.clone()))
.await?;
original.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut original, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"recover-after-leave"),
))
.await?;
let first = poll_share_until(
&mut original,
|records| {
records
.iter()
.any(|record| record.record.value.as_deref() == Some(&b"recover-after-leave"[..]))
},
Duration::from_secs(60),
)
.await?;
assert!(first.iter().any(|record| {
record.record.value.as_deref() == Some(&b"recover-after-leave"[..])
&& record.delivery_count == 1
}));
original.shutdown().await?;
let mut replacement =
KafkaShareConsumer::connect(ConsumerConfig::new(kafka.bootstrap_server(), group)).await?;
replacement.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut replacement, Duration::from_secs(60)).await?;
let recovered = poll_share_until(
&mut replacement,
|records| {
records.iter().any(|record| {
record.record.value.as_deref() == Some(&b"recover-after-leave"[..])
&& record.delivery_count >= 2
})
},
Duration::from_secs(60),
)
.await?;
let record = recovered
.iter()
.find(|record| record.record.value.as_deref() == Some(&b"recover-after-leave"[..]))
.expect("replacement consumer should reacquire the unacknowledged record");
assert!(record.delivery_count >= 2);
replacement.acknowledge(record, AcknowledgeType::Accept);
replacement.commit_sync().await?;
replacement.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_record_limit_mode_fetches_records() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-record-limit");
let group = unique_group("share-record-limit");
create_share_topic(kafka.bootstrap_server(), &topic, 2).await?;
let mut consumer = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group),
ShareConsumerOptions::default()
.with_share_acquire_mode(ShareAcquireMode::RecordLimit)
.with_max_poll_records(1),
)
.await?;
assert_eq!(consumer.share_acquire_mode(), ShareAcquireMode::RecordLimit);
consumer.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut consumer, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"partition-zero"),
))
.await?;
producer
.send(ProduceRecord::new(
topic.clone(),
1,
Bytes::from_static(b"partition-one"),
))
.await?;
let records = poll_share_until(
&mut consumer,
|records| !records.is_empty(),
Duration::from_secs(60),
)
.await?;
for record in records.iter() {
consumer.acknowledge(record, AcknowledgeType::Accept);
}
consumer.commit_sync().await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_round_trips_empty_payload_key_and_headers() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-empty-record");
let group = unique_group("share-empty-record");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let mut consumer =
KafkaShareConsumer::connect(ConsumerConfig::new(kafka.bootstrap_server(), group)).await?;
consumer.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut consumer, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(
ProduceRecord::new(topic.clone(), 0, Bytes::new())
.with_key(Bytes::new())
.with_headers([
RecordHeader::new("present", Bytes::from_static(b"value")),
RecordHeader::new("empty", Bytes::new()),
RecordHeader::null("null"),
]),
)
.await?;
let records = poll_share_until(
&mut consumer,
|records| {
records.iter().any(|record| {
record.record.key.as_deref() == Some(&b""[..])
&& record.record.value.as_deref() == Some(&b""[..])
})
},
Duration::from_secs(60),
)
.await?;
let record = records
.iter()
.find(|record| record.record.value.as_deref() == Some(&b""[..]))
.expect("expected empty share record");
assert_eq!(record.record.headers.len(), 3);
assert_eq!(record.record.headers[0].key, "present");
assert_eq!(
record.record.headers[0].value.as_deref(),
Some(&b"value"[..])
);
assert_eq!(record.record.headers[1].key, "empty");
assert_eq!(record.record.headers[1].value.as_deref(), Some(&b""[..]));
assert_eq!(record.record.headers[2].key, "null");
assert!(record.record.headers[2].value.is_none());
consumer.acknowledge(record, AcknowledgeType::Accept);
consumer.commit_sync().await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_subscribes_to_multiple_topics_and_acknowledges_each() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic_a = unique_topic("share-multi-a");
let topic_b = unique_topic("share-multi-b");
let group = unique_group("share-multi");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([
NewTopic::new(topic_a.clone(), 1, 1),
NewTopic::new(topic_b.clone(), 1, 1),
])
.await?;
let mut consumer =
KafkaShareConsumer::connect(ConsumerConfig::new(kafka.bootstrap_server(), group)).await?;
consumer
.subscribe(vec![topic_a.clone(), topic_b.clone()])
.await?;
wait_for_share_assignment_count(&mut consumer, 2, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic_a.clone(),
0,
Bytes::from_static(b"topic-a-value"),
))
.await?;
producer
.send(ProduceRecord::new(
topic_b.clone(),
0,
Bytes::from_static(b"topic-b-value"),
))
.await?;
let records = poll_share_until(
&mut consumer,
|records| {
records.iter().any(|record| {
record.record.topic == topic_a
&& record.record.value.as_deref() == Some(&b"topic-a-value"[..])
}) && records.iter().any(|record| {
record.record.topic == topic_b
&& record.record.value.as_deref() == Some(&b"topic-b-value"[..])
})
},
Duration::from_secs(60),
)
.await?;
for record in records.iter() {
consumer.acknowledge(record, AcknowledgeType::Accept);
}
consumer.commit_sync().await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_reject_does_not_redeliver_and_does_not_block_later_records() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-reject");
let group = unique_group("share-reject");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let mut consumer =
KafkaShareConsumer::connect(ConsumerConfig::new(kafka.bootstrap_server(), group)).await?;
consumer.subscribe(vec![topic.clone()]).await?;
wait_for_share_assignment(&mut consumer, Duration::from_secs(60)).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"reject-me"),
))
.await?;
let records = poll_share_until(
&mut consumer,
|records| {
records
.iter()
.any(|record| record.record.value.as_deref() == Some(&b"reject-me"[..]))
},
Duration::from_secs(60),
)
.await?;
let record = records
.iter()
.find(|record| record.record.value.as_deref() == Some(&b"reject-me"[..]))
.expect("reject-me should be fetched");
consumer.acknowledge(record, AcknowledgeType::Reject);
consumer.commit_sync().await?;
let redelivered =
poll_share_for_value(&mut consumer, b"reject-me", Duration::from_secs(3)).await?;
assert!(
!redelivered,
"rejected share record should not be redelivered"
);
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"after-reject"),
))
.await?;
let later = poll_share_until(
&mut consumer,
|records| {
records
.iter()
.any(|record| record.record.value.as_deref() == Some(&b"after-reject"[..]))
},
Duration::from_secs(60),
)
.await?;
let record = later
.iter()
.find(|record| record.record.value.as_deref() == Some(&b"after-reject"[..]))
.expect("after-reject should be fetched");
consumer.acknowledge(record, AcknowledgeType::Accept);
consumer.commit_sync().await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_released_record_can_be_picked_up_by_another_consumer() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-release-other-consumer");
let group = unique_group("share-release-other-consumer");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let mut consumer_a = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone()),
ShareConsumerOptions::default().with_max_poll_records(1),
)
.await?;
let mut consumer_b = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group),
ShareConsumerOptions::default().with_max_poll_records(1),
)
.await?;
consumer_a.subscribe(vec![topic.clone()]).await?;
consumer_b.subscribe(vec![topic.clone()]).await?;
wait_for_share_partition_assignment(&mut consumer_a, &topic, 0, Duration::from_secs(60))
.await?;
wait_for_share_partition_assignment(&mut consumer_b, &topic, 0, Duration::from_secs(60))
.await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"release-to-other"),
))
.await?;
let first = poll_share_value_from_either(
&mut consumer_a,
&mut consumer_b,
b"release-to-other",
Duration::from_secs(60),
)
.await?;
match first.consumer {
PolledConsumer::First => {
consumer_a.acknowledge(&first.record, AcknowledgeType::Release);
consumer_a.commit_sync().await?;
}
PolledConsumer::Second => {
consumer_b.acknowledge(&first.record, AcknowledgeType::Release);
consumer_b.commit_sync().await?;
}
}
let redelivered = poll_share_value_from_either_with_delivery_count(
&mut consumer_a,
&mut consumer_b,
b"release-to-other",
2,
Duration::from_secs(60),
)
.await?;
match redelivered.consumer {
PolledConsumer::First => {
consumer_a.acknowledge(&redelivered.record, AcknowledgeType::Accept);
consumer_a.commit_sync().await?;
}
PolledConsumer::Second => {
consumer_b.acknowledge(&redelivered.record, AcknowledgeType::Accept);
consumer_b.commit_sync().await?;
}
}
consumer_a.shutdown().await?;
consumer_b.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn share_consumer_empty_commit_is_noop() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("share-empty-commit");
let group = unique_group("share-empty-commit");
create_share_topic(kafka.bootstrap_server(), &topic, 1).await?;
let completed = Arc::new(Mutex::new(Vec::<ShareAcknowledgementCommit>::new()));
let completed_for_callback = Arc::clone(&completed);
let mut consumer = KafkaShareConsumer::connect_with_options(
ConsumerConfig::new(kafka.bootstrap_server(), group),
ShareConsumerOptions::default().with_acknowledgement_commit_callback(move |commits| {
completed_for_callback
.lock()
.expect("callback mutex")
.extend(commits);
}),
)
.await?;
consumer.subscribe(vec![topic]).await?;
wait_for_share_assignment(&mut consumer, Duration::from_secs(60)).await?;
consumer.commit_sync().await?;
assert!(completed.lock().expect("callback mutex").is_empty());
consumer.shutdown().await?;
Ok(())
}
async fn create_share_topic(bootstrap_server: &str, topic: &str, partitions: i32) -> Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new(bootstrap_server)).await?;
admin
.create_topics([NewTopic::new(topic.to_owned(), partitions, 1)])
.await?;
Ok(())
}
async fn poll_share_until<F>(
consumer: &mut KafkaShareConsumer,
predicate: F,
timeout: Duration,
) -> Result<ShareRecords>
where
F: Fn(&ShareRecords) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let records = consumer.poll_for(Duration::from_millis(500)).await?;
if predicate(&records) {
return Ok(records);
}
if Instant::now() >= deadline {
bail!("timed out waiting for share records");
}
}
}
async fn wait_for_share_assignment(
consumer: &mut KafkaShareConsumer,
timeout: Duration,
) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
let _ = consumer.poll_for(Duration::from_millis(500)).await?;
if !consumer.assignment().is_empty() {
return Ok(());
}
if Instant::now() >= deadline {
bail!("timed out waiting for share assignment");
}
}
}
async fn wait_for_share_assignment_count(
consumer: &mut KafkaShareConsumer,
expected_count: usize,
timeout: Duration,
) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
let _ = consumer.poll_for(Duration::from_millis(500)).await?;
if consumer.assignment().len() >= expected_count {
return Ok(());
}
if Instant::now() >= deadline {
bail!("timed out waiting for {expected_count} share assignments");
}
}
}
async fn wait_for_share_partition_assignment(
consumer: &mut KafkaShareConsumer,
topic: &str,
partition: i32,
timeout: Duration,
) -> Result<()> {
let expected = TopicPartition::new(topic.to_owned(), partition);
let deadline = Instant::now() + timeout;
loop {
let _ = consumer.poll_for(Duration::from_millis(500)).await?;
if consumer.assignment().contains(&expected) {
return Ok(());
}
if Instant::now() >= deadline {
bail!("timed out waiting for share assignment {topic}:{partition}");
}
}
}
async fn poll_two_share_consumers_until_both_receive(
consumer_a: &mut KafkaShareConsumer,
consumer_b: &mut KafkaShareConsumer,
timeout: Duration,
) -> Result<(Vec<Vec<u8>>, Vec<Vec<u8>>)> {
let deadline = Instant::now() + timeout;
let mut consumer_a_values = Vec::new();
let mut consumer_b_values = Vec::new();
loop {
collect_and_accept_share_records(consumer_a, &mut consumer_a_values).await?;
collect_and_accept_share_records(consumer_b, &mut consumer_b_values).await?;
if !consumer_a_values.is_empty() && !consumer_b_values.is_empty() {
return Ok((consumer_a_values, consumer_b_values));
}
if Instant::now() >= deadline {
bail!(
"timed out waiting for both share consumers to receive records: first={}, second={}",
consumer_a_values.len(),
consumer_b_values.len()
);
}
}
}
async fn collect_and_accept_share_records(
consumer: &mut KafkaShareConsumer,
values: &mut Vec<Vec<u8>>,
) -> Result<()> {
let records = consumer.poll_for(Duration::from_millis(250)).await?;
if records.is_empty() {
return Ok(());
}
for record in records.iter() {
if let Some(value) = &record.record.value {
values.push(value.to_vec());
}
consumer.acknowledge(record, AcknowledgeType::Accept);
}
consumer.commit_sync().await?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PolledConsumer {
First,
Second,
}
#[derive(Debug, Clone)]
struct PolledShareRecord {
consumer: PolledConsumer,
record: ShareRecord,
}
async fn poll_share_value_from_either(
consumer_a: &mut KafkaShareConsumer,
consumer_b: &mut KafkaShareConsumer,
value: &[u8],
timeout: Duration,
) -> Result<PolledShareRecord> {
poll_share_value_from_either_with_delivery_count(consumer_a, consumer_b, value, 1, timeout)
.await
}
async fn poll_share_value_from_either_with_delivery_count(
consumer_a: &mut KafkaShareConsumer,
consumer_b: &mut KafkaShareConsumer,
value: &[u8],
min_delivery_count: i16,
timeout: Duration,
) -> Result<PolledShareRecord> {
let deadline = Instant::now() + timeout;
loop {
if let Some(record) = poll_share_value_once(consumer_a, value, min_delivery_count).await? {
return Ok(PolledShareRecord {
consumer: PolledConsumer::First,
record,
});
}
if let Some(record) = poll_share_value_once(consumer_b, value, min_delivery_count).await? {
return Ok(PolledShareRecord {
consumer: PolledConsumer::Second,
record,
});
}
if Instant::now() >= deadline {
bail!(
"timed out waiting for share value {:?} with delivery_count >= {}",
String::from_utf8_lossy(value),
min_delivery_count
);
}
}
}
async fn poll_share_value_once(
consumer: &mut KafkaShareConsumer,
value: &[u8],
min_delivery_count: i16,
) -> Result<Option<ShareRecord>> {
let records = consumer.poll_for(Duration::from_millis(250)).await?;
Ok(records
.iter()
.find(|record| {
record.record.value.as_deref() == Some(value)
&& record.delivery_count >= min_delivery_count
})
.cloned())
}
async fn poll_share_for_value(
consumer: &mut KafkaShareConsumer,
value: &[u8],
timeout: Duration,
) -> Result<bool> {
let deadline = Instant::now() + timeout;
loop {
let records = consumer.poll_for(Duration::from_millis(500)).await?;
if records
.iter()
.any(|record| record.record.value.as_deref() == Some(value))
{
return Ok(true);
}
if Instant::now() >= deadline {
return Ok(false);
}
}
}