#![allow(dead_code)]
use std::sync::Arc;
use crabka_metadata::{MetadataImage, MetadataRecord, PartitionRecord};
use crabka_raft::NodeId;
use tracing::warn;
use crate::config_keys::{
RecoveryStrategy, UNCLEAN_LEADER_ELECTION_ENABLE, resolve_recovery_strategy,
};
use crate::error::BrokerError;
use crate::heartbeat::controller_state::ControllerLivenessState;
pub(crate) struct FailoverPlan {
pub changes: Vec<MetadataRecord>,
pub recoveries: Vec<(String, i32, RecoveryStrategy)>,
}
fn unclean_election_enabled(image: &MetadataImage, topic: &str) -> bool {
image
.topic_config(topic)
.and_then(|m| m.get(UNCLEAN_LEADER_ELECTION_ENABLE))
.is_some_and(|v| v == "true")
}
pub(crate) async fn compute_failover_changes(
image: &MetadataImage,
dead: NodeId,
liveness: &ControllerLivenessState,
metrics: &crate::metrics::BrokerMetrics,
) -> FailoverPlan {
let mut changes: Vec<MetadataRecord> = Vec::new();
let mut recoveries: Vec<(String, i32, RecoveryStrategy)> = Vec::new();
let alive = liveness.alive_snapshot().await;
for (_, pr) in image.all_partitions() {
if !pr.replicas.contains(&dead) && !pr.isr.contains(&dead) {
continue;
}
let mut alive_isr: Vec<NodeId> = Vec::with_capacity(pr.isr.len());
for n in &pr.isr {
if *n != dead && alive.contains(n) {
alive_isr.push(*n);
}
}
let needs_election = pr.leader == dead;
if needs_election {
if let Some(&new_leader) = alive_isr.first() {
changes.push(MetadataRecord::V1Partition(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: new_leader,
replicas: pr.replicas.clone(),
isr: alive_isr,
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
}));
} else {
match resolve_recovery_strategy(image, &pr.topic) {
RecoveryStrategy::Balanced | RecoveryStrategy::Aggressive => {
recoveries.push((
pr.topic.clone(),
pr.partition,
resolve_recovery_strategy(image, &pr.topic),
));
}
RecoveryStrategy::None if unclean_election_enabled(image, &pr.topic) => {
let mut elected: Option<NodeId> = None;
for &n in &pr.replicas {
if n != dead && alive.contains(&n) {
elected = Some(n);
break;
}
}
if let Some(new_leader) = elected {
warn!(
topic = %pr.topic, partition = pr.partition, leader = new_leader,
"unclean leader election: ISR empty, electing out-of-ISR replica (possible data loss)"
);
metrics.record_unclean_leader_election();
changes.push(MetadataRecord::V1Partition(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: new_leader,
replicas: pr.replicas.clone(),
isr: vec![new_leader],
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
}));
} else {
warn!(
topic = %pr.topic, partition = pr.partition,
"unclean leader election enabled but no alive replica; partition unavailable"
);
}
}
RecoveryStrategy::None => {
warn!(
topic = %pr.topic, partition = pr.partition,
"no live ISR replica; partition unavailable (strategy None, unclean.leader.election.enable=false)"
);
}
}
}
} else if alive_isr.len() < pr.isr.len() {
changes.push(MetadataRecord::V1Partition(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: pr.leader,
replicas: pr.replicas.clone(),
isr: alive_isr,
leader_epoch: pr.leader_epoch,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
}));
}
}
FailoverPlan {
changes,
recoveries,
}
}
#[allow(clippy::too_many_lines)]
pub(crate) async fn compute_offline_dir_failover_changes(
image: &MetadataImage,
broker: NodeId,
offline_uuids: &std::collections::HashSet<uuid::Uuid>,
liveness: &ControllerLivenessState,
metrics: &crate::metrics::BrokerMetrics,
) -> FailoverPlan {
let mut changes: Vec<MetadataRecord> = Vec::new();
let mut recoveries: Vec<(String, i32, RecoveryStrategy)> = Vec::new();
let alive = liveness.alive_snapshot().await;
for (_, pr) in image.all_partitions() {
let Some(slot) = pr.replicas.iter().position(|n| *n == broker) else {
continue;
};
let on_offline = pr
.directories
.get(slot)
.is_some_and(|d| offline_uuids.contains(d));
if !on_offline {
continue;
}
let mut alive_isr: Vec<NodeId> = Vec::with_capacity(pr.isr.len());
for n in &pr.isr {
if *n != broker && alive.contains(n) {
alive_isr.push(*n);
}
}
if pr.leader == broker {
if let Some(&new_leader) = alive_isr.first() {
changes.push(MetadataRecord::V1Partition(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: new_leader,
replicas: pr.replicas.clone(),
isr: alive_isr,
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
}));
} else {
let strategy = resolve_recovery_strategy(image, &pr.topic);
match strategy {
RecoveryStrategy::Balanced | RecoveryStrategy::Aggressive => {
recoveries.push((pr.topic.clone(), pr.partition, strategy));
}
RecoveryStrategy::None if unclean_election_enabled(image, &pr.topic) => {
let mut elected: Option<NodeId> = None;
for &n in &pr.replicas {
if n != broker && alive.contains(&n) {
elected = Some(n);
break;
}
}
if let Some(new_leader) = elected {
warn!(
topic = %pr.topic, partition = pr.partition, leader = new_leader,
"offline-dir unclean leader election: ISR empty, electing out-of-ISR replica (possible data loss)"
);
metrics.record_unclean_leader_election();
changes.push(MetadataRecord::V1Partition(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: new_leader,
replicas: pr.replicas.clone(),
isr: vec![new_leader],
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
}));
} else {
warn!(
topic = %pr.topic, partition = pr.partition,
"offline-dir unclean leader election enabled but no alive replica; partition unavailable"
);
}
}
RecoveryStrategy::None => {
warn!(
topic = %pr.topic, partition = pr.partition,
"offline dir on leader, no live ISR replica; partition unavailable"
);
}
}
}
} else if alive_isr.len() < pr.isr.len() {
changes.push(MetadataRecord::V1Partition(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: pr.leader,
replicas: pr.replicas.clone(),
isr: alive_isr,
leader_epoch: pr.leader_epoch,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
}));
}
}
FailoverPlan {
changes,
recoveries,
}
}
pub(crate) async fn on_broker_dead(
controller: &Arc<dyn crate::metadata_source::MetadataSource>,
node_id: NodeId,
dead: NodeId,
liveness: &Arc<ControllerLivenessState>,
metrics: &crate::metrics::BrokerMetrics,
recovery: &crate::unclean_recovery::UncleanRecoveryHandle,
) -> Result<(), BrokerError> {
let is_controller_leader = controller
.watch_leader()
.borrow()
.is_some_and(|n| n == node_id);
if !is_controller_leader {
return Ok(());
}
let image = controller.current_image();
let plan = compute_failover_changes(&image, dead, liveness, metrics).await;
if !plan.changes.is_empty() {
controller
.submit_change(plan.changes)
.await
.map_err(|e| BrokerError::Replication(format!("submit_change: {e}")))?;
}
for (topic, partition, strategy) in plan.recoveries {
recovery
.enqueue(crate::unclean_recovery::RecoveryJob {
topic,
partition,
strategy,
reply: None,
})
.await;
}
Ok(())
}
#[allow(clippy::unused_async)]
pub(crate) async fn on_broker_alive(
_controller: &Arc<dyn crate::metadata_source::MetadataSource>,
_node_id: NodeId,
_alive: NodeId,
_liveness: &Arc<ControllerLivenessState>,
) -> Result<(), BrokerError> {
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ElectionType {
Preferred,
Unclean,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ElectError {
UnknownTopicOrPartition,
PreferredAlreadyLeader,
ElectionNotNeeded,
PreferredNotInIsr,
PreferredNotAlive,
NoEligibleReplica,
}
pub(crate) async fn select_replacement_leader_for_shutdown(
image: &crabka_metadata::MetadataImage,
liveness: &ControllerLivenessState,
topic: &str,
partition: i32,
shutting_down: NodeId,
) -> Result<crabka_metadata::PartitionRecord, ElectError> {
let pr = image
.partition(topic, partition)
.ok_or(ElectError::UnknownTopicOrPartition)?;
if pr.leader != shutting_down {
return Err(ElectError::ElectionNotNeeded);
}
let mut new_leader: Option<NodeId> = None;
for &n in &pr.isr {
if n == shutting_down {
continue;
}
if liveness.is_alive(n).await {
new_leader = Some(n);
break;
}
}
let Some(new_leader) = new_leader else {
return Err(ElectError::NoEligibleReplica);
};
Ok(crabka_metadata::PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: new_leader,
replicas: pr.replicas.clone(),
isr: pr.isr.clone(),
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
})
}
pub(crate) async fn select_new_leader_for_partition(
image: &crabka_metadata::MetadataImage,
liveness: &ControllerLivenessState,
topic: &str,
partition: i32,
election: ElectionType,
) -> Result<PartitionRecord, ElectError> {
let pr = image
.partition(topic, partition)
.ok_or(ElectError::UnknownTopicOrPartition)?;
match election {
ElectionType::Preferred => {
let preferred = *pr
.replicas
.first()
.ok_or(ElectError::UnknownTopicOrPartition)?;
if pr.leader == preferred {
return Err(ElectError::PreferredAlreadyLeader);
}
if !pr.isr.contains(&preferred) {
return Err(ElectError::PreferredNotInIsr);
}
if !liveness.is_alive(preferred).await {
return Err(ElectError::PreferredNotAlive);
}
Ok(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: preferred,
replicas: pr.replicas.clone(),
isr: pr.isr.clone(),
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
})
}
ElectionType::Unclean => {
for &n in &pr.isr {
if liveness.is_alive(n).await {
return Err(ElectError::ElectionNotNeeded);
}
}
for &n in &pr.replicas {
if liveness.is_alive(n).await {
return Ok(PartitionRecord {
topic: pr.topic.clone(),
partition: pr.partition,
leader: n,
replicas: pr.replicas.clone(),
isr: vec![n],
leader_epoch: pr.leader_epoch + 1,
adding_replicas: pr.adding_replicas.clone(),
removing_replicas: pr.removing_replicas.clone(),
directories: pr.directories.clone(),
partition_epoch: pr.partition_epoch + 1,
});
}
}
Err(ElectError::NoEligibleReplica)
}
}
}
#[cfg(test)]
mod tests {
use assert2::assert;
use std::sync::Arc;
use std::time::Duration;
use crabka_metadata::{MetadataImage, MetadataRecord, PartitionRecord, TopicRecord};
use uuid::Uuid;
use super::{
ControllerLivenessState, ElectError, ElectionType, select_new_leader_for_partition,
select_replacement_leader_for_shutdown,
};
use crabka_raft::NodeId;
fn img_with_partition(
topic: &str,
partition: i32,
leader: NodeId,
replicas: &[NodeId],
isr: &[NodeId],
) -> MetadataImage {
let mut img = MetadataImage::new(Uuid::nil());
img.apply(&MetadataRecord::V1Topic(TopicRecord {
name: topic.into(),
topic_id: Uuid::nil(),
partitions: 1,
replication_factor: i16::try_from(replicas.len()).unwrap(),
}));
img.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: topic.into(),
partition,
leader,
replicas: replicas.to_vec(),
isr: isr.to_vec(),
leader_epoch: 5,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
img
}
async fn liveness_with_alive(alive: &[NodeId]) -> Arc<ControllerLivenessState> {
let l = ControllerLivenessState::new(Duration::from_secs(10));
for &n in alive {
l.record_heartbeat(n).await;
}
Arc::new(l)
}
#[tokio::test]
async fn preferred_happy_path() {
let img = img_with_partition("foo", 0, 2, &[1, 2, 3], &[1, 2, 3]);
let l = liveness_with_alive(&[1, 2, 3]).await;
let new_pr = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Preferred)
.await
.expect("should elect");
assert!(new_pr.leader == 1);
assert!(new_pr.isr == vec![1, 2, 3]);
assert!(new_pr.leader_epoch == 6);
assert!(new_pr.partition_epoch == 1);
}
#[tokio::test]
async fn preferred_already_leader() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = liveness_with_alive(&[1, 2, 3]).await;
let err = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Preferred)
.await
.unwrap_err();
assert!(err == ElectError::PreferredAlreadyLeader);
}
#[tokio::test]
async fn preferred_not_in_isr() {
let img = img_with_partition("foo", 0, 2, &[1, 2, 3], &[2, 3]);
let l = liveness_with_alive(&[1, 2, 3]).await;
let err = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Preferred)
.await
.unwrap_err();
assert!(err == ElectError::PreferredNotInIsr);
}
#[tokio::test]
async fn preferred_not_alive() {
let img = img_with_partition("foo", 0, 2, &[1, 2, 3], &[1, 2, 3]);
let l = liveness_with_alive(&[2, 3]).await; let err = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Preferred)
.await
.unwrap_err();
assert!(err == ElectError::PreferredNotAlive);
}
#[tokio::test]
async fn unclean_happy_path() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1]);
let l = liveness_with_alive(&[2, 3]).await;
let new_pr = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Unclean)
.await
.expect("unclean should elect");
assert!(new_pr.leader == 2);
assert!(new_pr.isr == vec![2]);
assert!(new_pr.leader_epoch == 6);
assert!(new_pr.partition_epoch == 1);
}
#[tokio::test]
async fn unclean_no_alive_replicas() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1]);
let l = liveness_with_alive(&[]).await; let err = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Unclean)
.await
.unwrap_err();
assert!(err == ElectError::NoEligibleReplica);
}
#[tokio::test]
async fn unclean_isr_member_alive_returns_election_not_needed() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1, 2]);
let l = liveness_with_alive(&[1, 2]).await; let err = select_new_leader_for_partition(&img, &l, "foo", 0, ElectionType::Unclean)
.await
.unwrap_err();
assert!(err == ElectError::ElectionNotNeeded);
}
#[tokio::test]
async fn shutdown_replacement_picks_alive_isr_member() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = liveness_with_alive(&[1, 2, 3]).await;
let new_pr =
select_replacement_leader_for_shutdown(&img, &l, "foo", 0, 1)
.await
.expect("should pick replacement");
assert!(new_pr.leader == 2);
assert!(new_pr.isr == vec![1, 2, 3]);
assert!(new_pr.leader_epoch == 6);
assert!(new_pr.partition_epoch == 1);
}
#[tokio::test]
async fn shutdown_replacement_skips_dead_isr_members() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = liveness_with_alive(&[1, 3]).await;
let new_pr = select_replacement_leader_for_shutdown(&img, &l, "foo", 0, 1)
.await
.expect("should pick replacement");
assert!(new_pr.leader == 3);
assert!(new_pr.leader_epoch == 6);
}
#[tokio::test]
async fn shutdown_replacement_election_not_needed_when_not_leader() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = liveness_with_alive(&[1, 2, 3, 5]).await;
let err = select_replacement_leader_for_shutdown(&img, &l, "foo", 0, 5)
.await
.unwrap_err();
assert!(err == ElectError::ElectionNotNeeded);
}
#[tokio::test]
async fn shutdown_replacement_no_other_isr_alive() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1]);
let l = liveness_with_alive(&[1, 2, 3]).await;
let err = select_replacement_leader_for_shutdown(&img, &l, "foo", 0, 1)
.await
.unwrap_err();
assert!(err == ElectError::NoEligibleReplica);
}
#[tokio::test]
async fn shutdown_replacement_other_isr_member_dead_falls_to_no_eligible() {
let img = img_with_partition("foo", 0, 1, &[1, 2, 3], &[1, 2]);
let l = liveness_with_alive(&[1, 3]).await; let err = select_replacement_leader_for_shutdown(&img, &l, "foo", 0, 1)
.await
.unwrap_err();
assert!(err == ElectError::NoEligibleReplica);
}
#[tokio::test]
async fn shutdown_replacement_unknown_partition() {
let img = MetadataImage::new(Uuid::nil());
let l = liveness_with_alive(&[1]).await;
let err = select_replacement_leader_for_shutdown(&img, &l, "ghost", 0, 1)
.await
.unwrap_err();
assert!(err == ElectError::UnknownTopicOrPartition);
}
#[tokio::test]
async fn unknown_topic_returns_error() {
let img = MetadataImage::new(Uuid::nil());
let l = liveness_with_alive(&[]).await;
let err = select_new_leader_for_partition(&img, &l, "ghost", 0, ElectionType::Preferred)
.await
.unwrap_err();
assert!(err == ElectError::UnknownTopicOrPartition);
}
use super::compute_failover_changes;
use crate::config_keys::{
RecoveryStrategy, UNCLEAN_LEADER_ELECTION_ENABLE, UNCLEAN_RECOVERY_STRATEGY,
};
use crabka_metadata::TopicConfigRecord;
use std::collections::BTreeMap;
fn set_topic_config(img: &mut MetadataImage, topic: &str, key: &str, value: &str) {
let mut overrides = BTreeMap::new();
overrides.insert(key.into(), value.into());
img.apply(&MetadataRecord::V1TopicConfig(TopicConfigRecord {
topic: topic.into(),
overrides,
}));
}
fn one_partition_change(changes: &[MetadataRecord]) -> &PartitionRecord {
assert!(
changes.len() == 1,
"expected exactly one change, got {changes:?}"
);
match &changes[0] {
MetadataRecord::V1Partition(pr) => pr,
other => panic!("expected V1Partition, got {other:?}"),
}
}
#[tokio::test]
async fn failover_picks_alive_isr_member_when_available() {
let img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.recoveries.is_empty());
let pr = one_partition_change(&plan.changes);
assert!(pr.leader == 2);
assert!(pr.isr == vec![2, 3]);
assert!(pr.leader_epoch == 6, "leader_epoch must bump on election");
}
#[tokio::test]
async fn failover_leaves_partition_unavailable_when_unclean_disabled() {
let img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.changes.is_empty(),
"default-off must not emit any change, got {:?}",
plan.changes,
);
assert!(plan.recoveries.is_empty());
}
#[tokio::test]
async fn failover_elects_unclean_when_topic_opts_in() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let metrics = crate::metrics::BrokerMetrics::new();
let plan = compute_failover_changes(&img, 1, &l, &metrics).await;
assert!(plan.recoveries.is_empty());
let pr = one_partition_change(&plan.changes);
assert!(pr.leader == 2, "must elect first alive replica (broker 2)");
assert!(
pr.isr == vec![2],
"unclean election installs singleton ISR (KIP-841)"
);
assert!(pr.leader_epoch == 6);
assert!(metrics.unclean_leader_elections_total.get() == 1);
}
#[tokio::test]
async fn failover_clean_does_not_bump_unclean_counter() {
let img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let metrics = crate::metrics::BrokerMetrics::new();
let _ = compute_failover_changes(&img, 1, &l, &metrics).await;
assert!(metrics.unclean_leader_elections_total.get() == 0);
}
#[tokio::test]
async fn failover_unclean_skips_when_no_alive_replica() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.changes.is_empty(),
"no alive replica → no election, got {:?}",
plan.changes,
);
assert!(plan.recoveries.is_empty());
}
#[tokio::test]
async fn failover_unclean_false_string_keeps_default_safe_behavior() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "false");
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.changes.is_empty(),
"explicit `false` keeps safe default"
);
assert!(plan.recoveries.is_empty());
}
#[tokio::test]
async fn failover_unclean_does_not_pick_dead_broker_itself() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
l.record_heartbeat(3).await;
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.recoveries.is_empty());
let pr = one_partition_change(&plan.changes);
assert!(pr.leader == 3);
assert!(pr.isr == vec![3]);
}
#[tokio::test]
async fn failover_unclean_does_not_apply_when_isr_still_has_alive_member() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1, 2]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.recoveries.is_empty());
let pr = one_partition_change(&plan.changes);
assert!(pr.leader == 2);
assert!(
pr.isr == vec![2],
"clean ISR-only election keeps the surviving ISR member, not a singleton-of-some-other-replica"
);
}
#[tokio::test]
async fn failover_shrinks_isr_for_partitions_where_dead_is_non_leader() {
let img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1, 2, 3]);
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [1u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
2,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.recoveries.is_empty());
let pr = one_partition_change(&plan.changes);
assert!(pr.leader == 1, "leader unchanged");
assert!(pr.isr == vec![1, 3]);
assert!(
pr.leader_epoch == 5,
"non-leader-change must NOT bump leader_epoch"
);
}
fn img_with_dirs(
topic: &str,
leader: NodeId,
replicas: &[NodeId],
isr: &[NodeId],
dirs: &[uuid::Uuid],
) -> MetadataImage {
let mut img = MetadataImage::new(uuid::Uuid::nil());
img.apply(&MetadataRecord::V1Topic(TopicRecord {
name: topic.into(),
topic_id: uuid::Uuid::nil(),
partitions: 1,
replication_factor: i16::try_from(replicas.len()).unwrap(),
}));
img.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: topic.into(),
partition: 0,
leader,
replicas: replicas.to_vec(),
isr: isr.to_vec(),
leader_epoch: 5,
adding_replicas: vec![],
removing_replicas: vec![],
directories: dirs.to_vec(),
partition_epoch: 0,
}));
img
}
#[tokio::test]
async fn offline_dir_elects_alive_isr_member_when_leader_dir_failed() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2, 3], &[bad, good, good]);
let l = ControllerLivenessState::new(std::time::Duration::from_secs(10));
for n in [1u64, 2, 3] {
l.record_heartbeat(n).await;
}
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = super::compute_offline_dir_failover_changes(
&img,
1,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
let MetadataRecord::V1Partition(pr) = &plan.changes[0] else {
panic!()
};
assert!(pr.leader == 2);
assert!(pr.isr == vec![2, 3]);
assert!(pr.leader_epoch == 6);
}
#[tokio::test]
async fn offline_dir_leaves_healthy_dir_partition_untouched() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2, 3], &[good, good, good]);
let l = ControllerLivenessState::new(std::time::Duration::from_secs(10));
for n in [1u64, 2, 3] {
l.record_heartbeat(n).await;
}
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = super::compute_offline_dir_failover_changes(
&img,
1,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.changes.is_empty());
}
#[tokio::test]
async fn offline_dir_shrinks_isr_for_non_leader_replica() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2, 3], &[good, bad, good]);
let l = ControllerLivenessState::new(std::time::Duration::from_secs(10));
for n in [1u64, 2, 3] {
l.record_heartbeat(n).await;
}
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = super::compute_offline_dir_failover_changes(
&img,
2,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
let MetadataRecord::V1Partition(pr) = &plan.changes[0] else {
panic!()
};
assert!(pr.leader == 1);
assert!(pr.isr == vec![1, 3]);
assert!(pr.leader_epoch == 5);
}
#[tokio::test]
async fn offline_dir_idempotent_after_failover() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let img = img_with_dirs("t", 2, &[1, 2, 3], &[2, 3], &[bad, good, good]);
let l = ControllerLivenessState::new(std::time::Duration::from_secs(10));
for n in [1u64, 2, 3] {
l.record_heartbeat(n).await;
}
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = super::compute_offline_dir_failover_changes(
&img,
1,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.changes.is_empty());
}
use super::compute_offline_dir_failover_changes;
#[tokio::test]
async fn offline_dir_empty_isr_balanced_strategy_defers_to_urm() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let mut img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2], &[bad, good, good]);
set_topic_config(&mut img, "t", UNCLEAN_RECOVERY_STRATEGY, "Balanced");
let l = ControllerLivenessState::new(Duration::from_secs(10));
l.record_heartbeat(3).await;
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = compute_offline_dir_failover_changes(
&img,
1,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.changes.is_empty(),
"Balanced strategy must not make an immediate change; got {:?}",
plan.changes
);
assert!(
plan.recoveries == vec![("t".to_string(), 0, RecoveryStrategy::Balanced)],
"Balanced strategy must enqueue a recovery job; got {:?}",
plan.recoveries
);
}
#[tokio::test]
async fn offline_dir_empty_isr_aggressive_strategy_defers_to_urm() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let mut img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2], &[bad, good, good]);
set_topic_config(&mut img, "t", UNCLEAN_RECOVERY_STRATEGY, "Aggressive");
let l = ControllerLivenessState::new(Duration::from_secs(10));
l.record_heartbeat(3).await;
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = compute_offline_dir_failover_changes(
&img,
1,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(plan.changes.is_empty());
assert!(
plan.recoveries == vec![("t".to_string(), 0, RecoveryStrategy::Aggressive)],
"Aggressive strategy must enqueue a recovery job; got {:?}",
plan.recoveries
);
}
#[tokio::test]
async fn offline_dir_empty_isr_unclean_enabled_elects_out_of_isr_replica() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let mut img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2], &[bad, good, good]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
l.record_heartbeat(3).await;
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let metrics = crate::metrics::BrokerMetrics::new();
let plan = compute_offline_dir_failover_changes(&img, 1, &offline, &l, &metrics).await;
assert!(plan.recoveries.is_empty());
let pr = one_partition_change(&plan.changes);
assert!(
pr.leader == 3,
"must elect broker 3 (only alive out-of-ISR)"
);
assert!(pr.isr == vec![3], "unclean election installs singleton ISR");
assert!(pr.leader_epoch == 6, "epoch must bump");
assert!(
metrics.unclean_leader_elections_total.get() == 1,
"unclean counter must be bumped exactly once"
);
}
#[tokio::test]
async fn offline_dir_empty_isr_no_unclean_leaves_partition_unavailable() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2], &[bad, good, good]);
let l = ControllerLivenessState::new(Duration::from_secs(10));
l.record_heartbeat(3).await; let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let plan = compute_offline_dir_failover_changes(
&img,
1,
&offline,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.changes.is_empty(),
"default-off must not emit any change; got {:?}",
plan.changes
);
assert!(plan.recoveries.is_empty());
}
#[tokio::test]
async fn offline_dir_empty_isr_unclean_enabled_no_alive_replica_stays_unavailable() {
let bad = uuid::Uuid::from_u128(0xDEAD);
let good = uuid::Uuid::from_u128(0x1);
let mut img = img_with_dirs("t", 1, &[1, 2, 3], &[1, 2], &[bad, good, good]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
let offline: std::collections::HashSet<uuid::Uuid> = [bad].into_iter().collect();
let metrics = crate::metrics::BrokerMetrics::new();
let plan = compute_offline_dir_failover_changes(&img, 1, &offline, &l, &metrics).await;
assert!(
plan.changes.is_empty(),
"no alive replica → no election; got {:?}",
plan.changes
);
assert!(plan.recoveries.is_empty());
assert!(
metrics.unclean_leader_elections_total.get() == 0,
"no election means no counter bump"
);
}
#[tokio::test]
async fn failover_balanced_strategy_requests_recovery_not_immediate_change() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
set_topic_config(&mut img, "t", UNCLEAN_RECOVERY_STRATEGY, "Balanced");
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.changes.is_empty(),
"Balanced strategy must defer to the URM, not elect immediately, got {:?}",
plan.changes,
);
assert!(plan.recoveries == vec![("t".to_string(), 0, RecoveryStrategy::Balanced)]);
}
#[tokio::test]
async fn failover_strategy_none_still_uses_legacy_enable_flag() {
let mut img = img_with_partition("t", 0, 1, &[1, 2, 3], &[1]);
set_topic_config(&mut img, "t", UNCLEAN_LEADER_ELECTION_ENABLE, "true");
let l = ControllerLivenessState::new(Duration::from_secs(10));
for n in [2u64, 3] {
l.record_heartbeat(n).await;
}
let plan = compute_failover_changes(
&img,
1,
&l,
&crate::metrics::BrokerMetrics::new(),
)
.await;
assert!(
plan.recoveries.is_empty(),
"strategy None must not enqueue an offset-aware recovery",
);
let pr = one_partition_change(&plan.changes);
assert!(pr.leader == 2, "legacy path picks first alive replica");
assert!(pr.isr == vec![2]);
}
}