use tracing::{debug, warn};
use super::assignor::PartitionAssignor;
use super::group_coordinator::GroupCoordinator;
use crate::error::{Error, KafkaCode, Result};
use crate::protocol::group::MemberAssignment;
pub trait RebalanceListener: Send + Sync {
fn on_partitions_revoked(&self, revoked: &[(String, Vec<i32>)]);
fn on_partitions_assigned(&self, assigned: &[(String, Vec<i32>)]);
}
pub struct RebalanceHandler {
coordinator: GroupCoordinator,
assignor: Box<dyn PartitionAssignor>,
listener: Option<Box<dyn RebalanceListener>>,
subscribed_topics: Vec<String>,
current_assignment: Option<MemberAssignment>,
}
impl RebalanceHandler {
#[must_use]
pub fn new(
coordinator: GroupCoordinator,
assignor: Box<dyn PartitionAssignor>,
subscribed_topics: Vec<String>,
) -> Self {
RebalanceHandler {
coordinator,
assignor,
listener: None,
subscribed_topics,
current_assignment: None,
}
}
#[must_use]
pub fn with_listener(mut self, listener: Box<dyn RebalanceListener>) -> Self {
self.listener = Some(listener);
self
}
#[must_use]
pub fn current_assignment(&self) -> Option<&MemberAssignment> {
self.current_assignment.as_ref()
}
pub fn join_group(&mut self) -> Result<MemberAssignment> {
let assignment = self
.coordinator
.join_group(self.assignor.as_ref(), &self.subscribed_topics)?;
self.current_assignment = Some(assignment.clone());
if let Some(ref listener) = self.listener {
let assigned: Vec<(String, Vec<i32>)> = assignment
.topic_partitions
.iter()
.map(|tp| (tp.topic.clone(), tp.partitions.clone()))
.collect();
listener.on_partitions_assigned(&assigned);
}
Ok(assignment)
}
pub fn handle_rebalance(&mut self) -> Result<MemberAssignment> {
debug!(
"Starting rebalance for group '{}'",
self.coordinator.group_id()
);
if let Some(ref listener) = self.listener
&& let Some(ref assignment) = self.current_assignment
{
let revoked: Vec<(String, Vec<i32>)> = assignment
.topic_partitions
.iter()
.map(|tp| (tp.topic.clone(), tp.partitions.clone()))
.collect();
if !revoked.is_empty() {
listener.on_partitions_revoked(&revoked);
}
}
let assignment = self
.coordinator
.rejoin(self.assignor.as_ref(), &self.subscribed_topics)?;
self.current_assignment = Some(assignment.clone());
if let Some(ref listener) = self.listener {
let assigned: Vec<(String, Vec<i32>)> = assignment
.topic_partitions
.iter()
.map(|tp| (tp.topic.clone(), tp.partitions.clone()))
.collect();
listener.on_partitions_assigned(&assigned);
}
debug!(
"Rebalance complete for group '{}'",
self.coordinator.group_id()
);
Ok(assignment)
}
pub fn heartbeat(&mut self) -> Result<bool> {
match self.coordinator.heartbeat() {
Ok(()) => Ok(true),
Err(e) => {
if is_rebalance_error(&e) {
warn!(
"Heartbeat failed with rebalance error: {:?}. Rebalance needed.",
e
);
Ok(false)
} else {
Err(e)
}
}
}
}
pub fn leave_group(&mut self) -> Result<()> {
if let Some(ref listener) = self.listener
&& let Some(ref assignment) = self.current_assignment
{
let revoked: Vec<(String, Vec<i32>)> = assignment
.topic_partitions
.iter()
.map(|tp| (tp.topic.clone(), tp.partitions.clone()))
.collect();
if !revoked.is_empty() {
listener.on_partitions_revoked(&revoked);
}
}
self.current_assignment = None;
self.coordinator.leave_group()
}
#[must_use]
pub fn coordinator(&self) -> &GroupCoordinator {
&self.coordinator
}
pub fn coordinator_mut(&mut self) -> &mut GroupCoordinator {
&mut self.coordinator
}
}
impl Drop for RebalanceHandler {
fn drop(&mut self) {
if self.current_assignment.is_some() {
let _ = self.coordinator.leave_group();
}
}
}
fn is_rebalance_error(err: &Error) -> bool {
matches!(
err,
Error::Kafka(
KafkaCode::RebalanceInProgress
| KafkaCode::IllegalGeneration
| KafkaCode::UnknownMemberId
| KafkaCode::GroupCoordinatorNotAvailable
)
)
}
#[cfg(test)]
mod tests {
use super::*;
struct TestListener;
impl RebalanceListener for TestListener {
fn on_partitions_revoked(&self, revoked: &[(String, Vec<i32>)]) {
let _ = revoked;
}
fn on_partitions_assigned(&self, assigned: &[(String, Vec<i32>)]) {
let _ = assigned;
}
}
#[test]
fn test_is_rebalance_error() {
let listener = TestListener;
listener.on_partitions_revoked(&[]);
listener.on_partitions_assigned(&[]);
assert!(is_rebalance_error(&Error::Kafka(
KafkaCode::RebalanceInProgress
)));
assert!(is_rebalance_error(&Error::Kafka(
KafkaCode::IllegalGeneration
)));
assert!(is_rebalance_error(&Error::Kafka(
KafkaCode::UnknownMemberId
)));
assert!(is_rebalance_error(&Error::Kafka(
KafkaCode::GroupCoordinatorNotAvailable
)));
assert!(!is_rebalance_error(&Error::Kafka(
KafkaCode::UnknownTopicOrPartition
)));
assert!(!is_rebalance_error(&Error::Kafka(
KafkaCode::LeaderNotAvailable
)));
}
}