#![allow(unused_imports)]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow, bail};
use bytes::Bytes;
use kafkit_client::{
AdminConfig, AlterConfigOp, AutoOffsetReset, ConfigResource, ConsumerConfig, ConsumerError,
ConsumerRebalanceEvent, Error, IsolationLevel, KafkaAdmin, KafkaClient, KafkaConsumer,
KafkaMessage, KafkaProducer, NewPartitions, NewTopic, ProduceRecord, ProducerCompression,
ProducerConfig, RecordHeader, SubscriptionPattern, TopicPartition, TopicPartitionTimestamp,
};
use crate::common::{
KafkaHarness, collect_values, expected_assignment, poll_describe_until, poll_until,
poll_until_assignment, poll_until_consumer_error, poll_until_with_admin, poll_until_with_api,
unique_group, unique_topic,
};
#[tokio::test(flavor = "multi_thread")]
async fn consumer_query_apis_expose_offsets_metadata_and_seek_by_timestamp() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("consumer-query-apis");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(
ProduceRecord::new(topic.clone(), 0, Bytes::from_static(b"first"))
.with_timestamp(1_700_000_000_100),
)
.await?;
producer
.send(
ProduceRecord::new(topic.clone(), 0, Bytes::from_static(b"second"))
.with_timestamp(1_700_000_000_200),
)
.await?;
producer
.send(
ProduceRecord::new(topic.clone(), 0, Bytes::from_static(b"third"))
.with_timestamp(1_700_000_000_300),
)
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(
kafka.bootstrap_server(),
unique_group("consumer-query-apis"),
)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
let partition = TopicPartition::new(topic.clone(), 0);
consumer.assign(vec![partition.clone()]).await?;
let beginning = consumer.beginning_offsets(vec![partition.clone()]).await?;
assert_eq!(beginning.len(), 1);
assert_eq!(beginning[0].offset, 0);
let end = consumer.end_offsets(vec![partition.clone()]).await?;
assert_eq!(end.len(), 1);
assert_eq!(end[0].offset, 3);
let looked_up = consumer
.offsets_for_times(vec![TopicPartitionTimestamp::new(
topic.clone(),
0,
1_700_000_000_200,
)])
.await?;
assert_eq!(looked_up.len(), 1);
assert_eq!(looked_up[0].offset, 1);
assert_eq!(looked_up[0].timestamp, 1_700_000_000_200);
let missing_timestamp = consumer
.offsets_for_times(vec![TopicPartitionTimestamp::new(
topic.clone(),
0,
9_999_999_999_999,
)])
.await?;
assert!(missing_timestamp.is_empty());
consumer
.seek_to_timestamp(vec![TopicPartitionTimestamp::new(
topic.clone(),
0,
1_700_000_000_200,
)])
.await?;
let replayed = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
replayed
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
);
let topics = consumer.list_topics().await?;
assert!(topics.iter().any(|listed| listed == &topic));
let partitions = consumer.partitions_for(topic.clone()).await?;
assert_eq!(partitions.len(), 2);
assert!(partitions.iter().all(|partition| partition.leader_id >= 0));
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_committed_offsets_are_visible_after_restart() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("consumer-committed-query");
let group = unique_group("consumer-committed-query");
kafka.create_topic(&topic).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"first"),
))
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone())
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
let first_batch = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"first"[..]))
},
Duration::from_secs(20),
)
.await?;
consumer.commit(&first_batch).await?;
let partition = TopicPartition::new(topic.clone(), 0);
let committed = consumer.committed(vec![partition.clone()]).await?;
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].offset, 1);
consumer.shutdown().await?;
let restarted = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
let committed = restarted.committed(vec![partition]).await?;
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].offset, 1);
restarted.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_wakeup_interrupts_poll_and_unsubscribe_clears_delivery() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("consumer-wakeup");
let group = unique_group("consumer-wakeup");
kafka.create_topic(&topic).await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
let (poll_result, wakeup_result) =
tokio::join!(consumer.poll_for(Duration::from_secs(30)), async {
tokio::time::sleep(Duration::from_millis(100)).await;
consumer.wakeup().await
});
wakeup_result?;
let error = poll_result.expect_err("poll should be interrupted by wakeup");
assert!(error.to_string().contains("wakeup"));
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"before-unsubscribe"),
))
.await?;
let delivered = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"before-unsubscribe"[..]))
},
Duration::from_secs(20),
)
.await?;
assert_eq!(delivered.len(), 1);
consumer.unsubscribe().await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"after-unsubscribe"),
))
.await?;
let drained = consumer.poll_for(Duration::from_secs(1)).await?;
assert!(drained.is_empty());
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_rejects_concurrent_poll_and_prearmed_wakeup() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("consumer-concurrent-poll");
let group = unique_group("consumer-concurrent-poll");
kafka.create_topic(&topic).await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
consumer.wakeup().await?;
let error = consumer
.poll_for(Duration::from_secs(1))
.await
.expect_err("pre-armed wakeup should interrupt the next poll");
assert!(matches!(error, Error::Consumer(ConsumerError::Wakeup)));
let (first_poll, second_poll, wakeup) = tokio::join!(
consumer.poll_for(Duration::from_secs(30)),
async {
tokio::time::sleep(Duration::from_millis(100)).await;
consumer.poll_for(Duration::from_millis(250)).await
},
async {
tokio::time::sleep(Duration::from_millis(300)).await;
consumer.wakeup().await
}
);
wakeup?;
assert!(matches!(
second_poll,
Err(Error::Consumer(ConsumerError::ConcurrentPoll))
));
assert!(matches!(
first_poll,
Err(Error::Consumer(ConsumerError::Wakeup))
));
consumer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_reports_invalid_seek_and_unassigned_partition_errors() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("consumer-unassigned-errors");
let partition = TopicPartition::new(topic.clone(), 0);
kafka.create_topic(&topic).await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(
kafka.bootstrap_server(),
unique_group("consumer-unassigned-errors"),
)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
let error = consumer
.seek(partition.clone(), -1)
.await
.expect_err("negative seek offsets should be rejected locally");
assert!(matches!(
error,
Error::Consumer(ConsumerError::InvalidSeekOffset { offset: -1 })
));
let error = consumer
.position(partition.clone())
.await
.expect_err("position should require an assigned partition");
assert!(matches!(
error,
Error::Consumer(ConsumerError::PartitionNotAssigned {
operation: "position",
..
})
));
let error = consumer
.seek(partition.clone(), 0)
.await
.expect_err("seek should require an assigned partition");
assert!(matches!(
error,
Error::Consumer(ConsumerError::PartitionNotAssigned {
operation: "seek",
..
})
));
let error = consumer
.pause(vec![partition.clone()])
.await
.expect_err("pause should require an assigned partition");
assert!(matches!(
error,
Error::Consumer(ConsumerError::PartitionNotAssigned {
operation: "operate on",
..
})
));
consumer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_auto_commit_advances_group_offset_after_interval() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("consumer-auto-commit");
let group = unique_group("consumer-auto-commit");
let partition = TopicPartition::new(topic.clone(), 0);
kafka.create_topic(&topic).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"auto-commit-me"),
))
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone())
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_enable_auto_commit(true)
.with_auto_commit_interval(Duration::from_millis(200)),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
let records = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"auto-commit-me"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(!records.is_empty());
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let committed = consumer.committed(vec![partition.clone()]).await?;
if committed.iter().any(|offset| offset.offset == 1) {
break;
}
if Instant::now() >= deadline {
bail!("timed out waiting for auto-commit to advance group offset");
}
let _ = consumer.poll_for(Duration::from_millis(250)).await?;
}
consumer.shutdown().await?;
producer.shutdown().await?;
let restarted = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
restarted.subscribe(vec![topic]).await?;
let replayed = restarted.poll_for(Duration::from_secs(1)).await?;
assert!(
replayed.is_empty(),
"restarted consumer should begin after the auto-committed record"
);
restarted.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_manual_assignment_supports_seek_position_pause_and_resume() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("manual-assign");
let partition = TopicPartition::new(topic.clone(), 0);
kafka.create_topic(&topic).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
for value in [b"zero".as_slice(), b"one".as_slice(), b"two".as_slice()] {
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::copy_from_slice(value),
))
.await?;
}
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), unique_group("manual-assign"))
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.assign(vec![partition.clone()]).await?;
assert_eq!(consumer.position(partition.clone()).await?, 0);
consumer.seek(partition.clone(), 1).await?;
assert_eq!(consumer.position(partition.clone()).await?, 1);
let records = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"one"[..]))
},
Duration::from_secs(20),
)
.await?;
let values = records
.iter()
.filter_map(|record| record.value.as_deref())
.map(|value| String::from_utf8_lossy(value).into_owned())
.collect::<Vec<_>>();
assert_eq!(values, vec!["one".to_owned(), "two".to_owned()]);
assert_eq!(consumer.position(partition.clone()).await?, 3);
consumer.seek_to_beginning(vec![partition.clone()]).await?;
assert_eq!(consumer.position(partition.clone()).await?, 0);
let replayed = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"zero"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
replayed
.iter()
.any(|record| record.value.as_deref() == Some(&b"zero"[..]))
);
consumer.seek_to_end(vec![partition.clone()]).await?;
assert_eq!(consumer.position(partition.clone()).await?, 3);
consumer.pause(vec![partition.clone()]).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"after-pause"),
))
.await?;
let paused_batch = consumer.poll_for(Duration::from_secs(1)).await?;
assert!(paused_batch.is_empty());
assert_eq!(consumer.position(partition.clone()).await?, 3);
consumer.resume(vec![partition.clone()]).await?;
let resumed = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"after-pause"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
resumed
.iter()
.any(|record| record.value.as_deref() == Some(&b"after-pause"[..]))
);
assert_eq!(consumer.position(partition.clone()).await?, 4);
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_resets_out_of_range_fetch_offset_using_reset_policy() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("out-of-range-reset");
let partition = TopicPartition::new(topic.clone(), 0);
kafka.create_topic(&topic).await?;
let producer = KafkaProducer::connect(ProducerConfig::new(kafka.bootstrap_server())).await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"after-reset"),
))
.await?;
producer.shutdown().await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), unique_group("out-of-range-reset"))
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.assign(vec![partition.clone()]).await?;
consumer.seek(partition.clone(), 10_000).await?;
let records = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"after-reset"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"after-reset"[..]))
);
assert_eq!(consumer.position(partition.clone()).await?, 1);
consumer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn committed_offsets_survive_consumer_restart() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("commit-restart");
let group = unique_group("commit-restart");
kafka.create_topic(&topic).await?;
let producer = KafkaProducer::connect(
ProducerConfig::new(kafka.bootstrap_server()).with_compression(ProducerCompression::None),
)
.await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"first"),
))
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone())
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
let first_batch = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"first"[..]))
},
Duration::from_secs(20),
)
.await?;
consumer.commit(&first_batch).await?;
consumer.shutdown().await?;
producer
.send(ProduceRecord::new(
topic.clone(),
0,
Bytes::from_static(b"second"),
))
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group)
.with_auto_offset_reset(AutoOffsetReset::Earliest),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
let second_batch = poll_until(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
second_batch
.iter()
.all(|record| record.value.as_deref() != Some(&b"first"[..]))
);
assert!(
second_batch
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
);
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_rebalance_listener_observes_assignment_and_revocation() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let topic = unique_topic("rebalance-listener");
let group = unique_group("rebalance-listener");
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
let events_a = Arc::new(std::sync::Mutex::new(Vec::new()));
let captured_a = Arc::clone(&events_a);
let consumer_a = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone()).with_rebalance_callback(
move |event| {
captured_a.lock().expect("rebalance events").push(event);
},
),
)
.await?;
consumer_a.subscribe(vec![topic.clone()]).await?;
poll_until_assignment(
&consumer_a,
expected_assignment(&topic, 2),
Duration::from_secs(20),
)
.await?;
assert!(has_rebalance_event(&events_a, |event| {
matches!(event, ConsumerRebalanceEvent::Assigned(partitions) if partitions.len() == 2)
}));
let events_b = Arc::new(std::sync::Mutex::new(Vec::new()));
let captured_b = Arc::clone(&events_b);
let consumer_b = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group).with_rebalance_callback(
move |event| {
captured_b.lock().expect("rebalance events").push(event);
},
),
)
.await?;
consumer_b.subscribe(vec![topic.clone()]).await?;
poll_until_split_assignment(&consumer_a, &consumer_b, Duration::from_secs(20)).await?;
assert!(has_rebalance_event(&events_a, |event| {
matches!(event, ConsumerRebalanceEvent::Revoked(partitions) if !partitions.is_empty())
}));
assert!(has_rebalance_event(&events_b, |event| {
matches!(event, ConsumerRebalanceEvent::Assigned(partitions) if !partitions.is_empty())
}));
consumer_b.shutdown().await?;
consumer_a.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_regex_subscription_lifecycle_matches_topic_updates() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let group = unique_group("regex-lifecycle");
let matching_a = unique_topic("regex-match");
let matching_b = unique_topic("regex-match");
let other = unique_topic("regex-other");
admin
.create_topics([
NewTopic::new(matching_a.clone(), 2, 1),
NewTopic::new(other.clone(), 1, 1),
])
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group)
.with_metadata_max_age(Duration::from_millis(200)),
)
.await?;
consumer
.subscribe_regex("regex-match-.*".to_owned())
.await?;
poll_until_assignment(
&consumer,
expected_assignment(&matching_a, 2),
Duration::from_secs(20),
)
.await?;
admin
.create_topics([NewTopic::new(matching_b.clone(), 1, 1)])
.await?;
let mut expected = expected_assignment(&matching_a, 2);
expected.extend(expected_assignment(&matching_b, 1));
poll_until_assignment(&consumer, expected, Duration::from_secs(20)).await?;
consumer
.subscribe_regex("regex-other-.*".to_owned())
.await?;
poll_until_assignment(
&consumer,
expected_assignment(&other, 1),
Duration::from_secs(20),
)
.await?;
consumer.unsubscribe().await?;
let assignment = consumer.assignment().await?;
assert!(assignment.is_empty());
consumer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_regex_subscription_rejects_invalid_patterns() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let consumer = KafkaConsumer::connect(ConsumerConfig::new(
kafka.bootstrap_server(),
unique_group("regex-invalid"),
))
.await?;
consumer
.subscribe_pattern(SubscriptionPattern::new("(t.*c"))
.await?;
let error = poll_until_consumer_error(&consumer, Duration::from_secs(10)).await?;
assert!(matches!(
error,
kafkit_client::Error::Consumer(ConsumerError::InvalidRegularExpression { .. })
));
let _ = consumer.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_rejects_unsupported_assignor() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("unsupported-assignor");
kafka.create_topic(&topic).await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(
kafka.bootstrap_server(),
unique_group("unsupported-assignor"),
)
.with_server_assignor("not-a-real-assignor"),
)
.await?;
consumer.subscribe(vec![topic]).await?;
let error = poll_until_consumer_error(&consumer, Duration::from_secs(10)).await?;
assert!(matches!(
error,
kafkit_client::Error::Consumer(ConsumerError::UnsupportedAssignor { .. })
));
let _ = consumer.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn consumer_rejects_duplicate_static_members_until_the_original_leaves() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let topic = unique_topic("static-member-fence");
let group = unique_group("static-member-fence");
let instance_id = unique_group("static-instance");
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
let original = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone())
.with_instance_id(instance_id.clone()),
)
.await?;
original.subscribe(vec![topic.clone()]).await?;
let expected = expected_assignment(&topic, 2);
poll_until_assignment(&original, expected.clone(), Duration::from_secs(20)).await?;
let duplicate = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone())
.with_instance_id(instance_id.clone()),
)
.await?;
duplicate.subscribe(vec![topic.clone()]).await?;
poll_until_static_member_conflict(&original, &duplicate, Duration::from_secs(20)).await?;
let _ = duplicate.shutdown().await;
let _ = original.shutdown().await;
let replacement = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group).with_instance_id(instance_id),
)
.await?;
replacement.subscribe(vec![topic]).await?;
poll_until_assignment(&replacement, expected, Duration::from_secs(20)).await?;
replacement.shutdown().await?;
Ok(())
}
async fn poll_until_static_member_conflict(
original: &KafkaConsumer,
duplicate: &KafkaConsumer,
timeout: Duration,
) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
if static_member_conflict_result(duplicate).await?.is_some() {
return Ok(());
}
if static_member_conflict_result(original).await?.is_some() {
return Ok(());
}
if Instant::now() >= deadline {
bail!("timed out waiting for duplicate static member conflict");
}
}
}
async fn static_member_conflict_result(consumer: &KafkaConsumer) -> Result<Option<()>> {
match consumer.poll_for(Duration::from_millis(200)).await {
Ok(_) => Ok(None),
Err(error) if is_static_member_conflict(&error) => Ok(Some(())),
Err(error) => Err(error.into()),
}
}
fn is_static_member_conflict(error: &kafkit_client::Error) -> bool {
matches!(
error,
kafkit_client::Error::Consumer(
ConsumerError::UnreleasedInstanceId { .. } | ConsumerError::FencedInstanceId { .. }
)
)
}
async fn poll_until_split_assignment(
consumer_a: &KafkaConsumer,
consumer_b: &KafkaConsumer,
timeout: Duration,
) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
let assignment_a = consumer_a.assignment().await?;
let assignment_b = consumer_b.assignment().await?;
if assignment_a.len() == 1 && assignment_b.len() == 1 {
return Ok(());
}
if Instant::now() >= deadline {
bail!("timed out waiting for split assignment; a={assignment_a:?}, b={assignment_b:?}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
fn has_rebalance_event(
events: &Arc<std::sync::Mutex<Vec<ConsumerRebalanceEvent>>>,
predicate: impl Fn(&ConsumerRebalanceEvent) -> bool,
) -> bool {
events
.lock()
.expect("rebalance events")
.iter()
.any(predicate)
}