use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use thiserror::Error;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Version {
pub major: u32,
pub minor: u32,
pub patch: u32,
pub build: Option<String>,
}
impl Version {
pub fn new(major: u32, minor: u32, patch: u32) -> Self {
Self {
major,
minor,
patch,
build: None,
}
}
pub fn with_build(mut self, build: impl Into<String>) -> Self {
self.build = Some(build.into());
self
}
pub fn is_compatible_with(&self, other: &Version) -> bool {
self.major == other.major
}
pub fn is_newer_than(&self, other: &Version) -> bool {
if self.major != other.major {
return self.major > other.major;
}
if self.minor != other.minor {
return self.minor > other.minor;
}
self.patch > other.patch
}
pub fn parse(s: &str) -> Option<Self> {
let parts: Vec<&str> = s.split('.').collect();
if parts.len() < 3 {
return None;
}
Some(Self {
major: parts[0].parse().ok()?,
minor: parts[1].parse().ok()?,
patch: parts[2].parse().ok()?,
build: None,
})
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)?;
if let Some(ref build) = self.build {
write!(f, "+{}", build)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpgradeConfig {
pub drain_timeout_ms: u64,
pub health_check_interval_ms: u64,
pub required_health_checks: u32,
pub replicas_first: bool,
pub max_concurrent: u32,
pub auto_rollback: bool,
pub min_healthy_nodes: u32,
}
impl Default for UpgradeConfig {
fn default() -> Self {
Self {
drain_timeout_ms: 60000, health_check_interval_ms: 5000, required_health_checks: 3, replicas_first: true, max_concurrent: 1, auto_rollback: true, min_healthy_nodes: 1, }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeUpgradeState {
Normal,
Scheduled,
Draining,
Upgrading,
Recovering,
Completed,
Failed,
RollingBack,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeUpgradeInfo {
pub node_id: String,
pub state: NodeUpgradeState,
pub current_version: Version,
pub target_version: Option<Version>,
pub started_at: Option<u64>,
pub completed_at: Option<u64>,
pub error: Option<String>,
pub health_checks_passed: u32,
pub is_leader: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpgradeStatus {
Idle,
Planning,
InProgress,
Paused,
Completed,
Failed,
RollingBack,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpgradePlan {
pub upgrade_id: String,
pub target_version: Version,
pub node_order: Vec<String>,
pub current_index: usize,
pub status: UpgradeStatus,
pub created_at: u64,
pub started_at: Option<u64>,
pub completed_at: Option<u64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UpgradeStats {
pub total_nodes: u32,
pub upgraded_nodes: u32,
pub failed_nodes: u32,
pub pending_nodes: u32,
pub currently_upgrading: u32,
pub rollback_count: u32,
}
#[derive(Debug, Error)]
pub enum UpgradeError {
#[error("Upgrade already in progress: {0}")]
AlreadyInProgress(String),
#[error("Incompatible version: {current} cannot upgrade to {target}")]
IncompatibleVersion { current: String, target: String },
#[error("Node not found: {0}")]
NodeNotFound(String),
#[error("Drain timeout for node: {0}")]
DrainTimeout(String),
#[error("Health check failed for node: {0}")]
HealthCheckFailed(String),
#[error("Not enough healthy nodes: have {have}, need {need}")]
NotEnoughHealthyNodes { have: u32, need: u32 },
#[error("Upgrade failed: {0}")]
UpgradeFailed(String),
#[error("Rollback failed: {0}")]
RollbackFailed(String),
#[error("No upgrade in progress")]
NoUpgradeInProgress,
#[error("Upgrade paused")]
UpgradePaused,
}
pub type Result<T> = std::result::Result<T, UpgradeError>;
pub struct UpgradeManager {
config: UpgradeConfig,
current_plan: Arc<RwLock<Option<UpgradePlan>>>,
node_states: Arc<RwLock<HashMap<String, NodeUpgradeInfo>>>,
paused: AtomicBool,
}
impl UpgradeManager {
pub fn new(config: UpgradeConfig) -> Self {
Self {
config,
current_plan: Arc::new(RwLock::new(None)),
node_states: Arc::new(RwLock::new(HashMap::new())),
paused: AtomicBool::new(false),
}
}
pub fn register_node(&self, node_id: &str, version: Version, is_leader: bool) {
let mut states = self.node_states.write();
states.insert(
node_id.to_string(),
NodeUpgradeInfo {
node_id: node_id.to_string(),
state: NodeUpgradeState::Normal,
current_version: version,
target_version: None,
started_at: None,
completed_at: None,
error: None,
health_checks_passed: 0,
is_leader,
},
);
}
pub fn unregister_node(&self, node_id: &str) {
let mut states = self.node_states.write();
states.remove(node_id);
}
pub fn update_leader_status(&self, node_id: &str, is_leader: bool) {
let mut states = self.node_states.write();
if let Some(info) = states.get_mut(node_id) {
info.is_leader = is_leader;
}
}
pub fn plan_upgrade(&self, target_version: Version) -> Result<UpgradePlan> {
{
let plan = self.current_plan.read();
if let Some(ref p) = *plan {
if p.status == UpgradeStatus::InProgress {
return Err(UpgradeError::AlreadyInProgress(p.upgrade_id.clone()));
}
}
}
let states = self.node_states.read();
for (_node_id, info) in states.iter() {
if !info.current_version.is_compatible_with(&target_version) {
return Err(UpgradeError::IncompatibleVersion {
current: info.current_version.to_string(),
target: target_version.to_string(),
});
}
}
let mut replicas: Vec<String> = Vec::new();
let mut leaders: Vec<String> = Vec::new();
for (node_id, info) in states.iter() {
if info.current_version == target_version {
continue;
}
if info.is_leader {
leaders.push(node_id.clone());
} else {
replicas.push(node_id.clone());
}
}
replicas.sort();
leaders.sort();
let node_order = if self.config.replicas_first {
replicas.into_iter().chain(leaders).collect()
} else {
leaders.into_iter().chain(replicas).collect()
};
let plan = UpgradePlan {
upgrade_id: generate_upgrade_id(),
target_version,
node_order,
current_index: 0,
status: UpgradeStatus::Planning,
created_at: current_time_ms(),
started_at: None,
completed_at: None,
};
{
let mut current = self.current_plan.write();
*current = Some(plan.clone());
}
info!("Created upgrade plan: {}", plan.upgrade_id);
Ok(plan)
}
pub fn start_upgrade(&self) -> Result<()> {
let mut plan = self.current_plan.write();
let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
if p.status != UpgradeStatus::Planning && p.status != UpgradeStatus::Paused {
return Err(UpgradeError::AlreadyInProgress(p.upgrade_id.clone()));
}
p.status = UpgradeStatus::InProgress;
p.started_at = Some(current_time_ms());
self.paused.store(false, Ordering::SeqCst);
let mut states = self.node_states.write();
let batch_size = self.config.max_concurrent as usize;
for node_id in p.node_order.iter().take(batch_size) {
if let Some(info) = states.get_mut(node_id) {
info.state = NodeUpgradeState::Scheduled;
info.target_version = Some(p.target_version.clone());
}
}
info!("Started upgrade: {}", p.upgrade_id);
Ok(())
}
pub fn pause_upgrade(&self) -> Result<()> {
let mut plan = self.current_plan.write();
let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
if p.status != UpgradeStatus::InProgress {
return Err(UpgradeError::NoUpgradeInProgress);
}
p.status = UpgradeStatus::Paused;
self.paused.store(true, Ordering::SeqCst);
info!("Paused upgrade: {}", p.upgrade_id);
Ok(())
}
pub fn resume_upgrade(&self) -> Result<()> {
let mut plan = self.current_plan.write();
let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
if p.status != UpgradeStatus::Paused {
return Err(UpgradeError::NoUpgradeInProgress);
}
p.status = UpgradeStatus::InProgress;
self.paused.store(false, Ordering::SeqCst);
info!("Resumed upgrade: {}", p.upgrade_id);
Ok(())
}
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
pub fn start_drain(&self, node_id: &str) -> Result<()> {
let mut states = self.node_states.write();
let info = states
.get_mut(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
if info.state != NodeUpgradeState::Scheduled {
return Ok(()); }
info.state = NodeUpgradeState::Draining;
info.started_at = Some(current_time_ms());
debug!("Started draining node: {}", node_id);
Ok(())
}
pub fn complete_drain(&self, node_id: &str) -> Result<()> {
let mut states = self.node_states.write();
let info = states
.get_mut(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
if info.state != NodeUpgradeState::Draining {
return Ok(()); }
info.state = NodeUpgradeState::Upgrading;
debug!("Node {} drain complete, starting upgrade", node_id);
Ok(())
}
pub fn complete_node_upgrade(&self, node_id: &str, new_version: Version) -> Result<()> {
let mut states = self.node_states.write();
let info = states
.get_mut(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
info.state = NodeUpgradeState::Recovering;
info.current_version = new_version;
info.health_checks_passed = 0;
debug!("Node {} upgrade complete, starting recovery", node_id);
Ok(())
}
pub fn record_health_check(&self, node_id: &str, healthy: bool) -> Result<bool> {
let mut states = self.node_states.write();
let info = states
.get_mut(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
if info.state != NodeUpgradeState::Recovering {
return Ok(false);
}
if healthy {
info.health_checks_passed += 1;
if info.health_checks_passed >= self.config.required_health_checks {
info.state = NodeUpgradeState::Completed;
info.completed_at = Some(current_time_ms());
debug!("Node {} recovery complete", node_id);
drop(states);
self.advance_upgrade()?;
return Ok(true);
}
} else {
info.health_checks_passed = 0;
if self.config.auto_rollback {
info.state = NodeUpgradeState::Failed;
info.error = Some("Health check failed".to_string());
return Err(UpgradeError::HealthCheckFailed(node_id.to_string()));
}
}
Ok(false)
}
fn advance_upgrade(&self) -> Result<()> {
let mut plan = self.current_plan.write();
let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
if p.status != UpgradeStatus::InProgress {
return Ok(());
}
p.current_index += 1;
if p.current_index >= p.node_order.len() {
p.status = UpgradeStatus::Completed;
p.completed_at = Some(current_time_ms());
info!("Upgrade {} completed successfully", p.upgrade_id);
return Ok(());
}
let mut states = self.node_states.write();
let batch_end =
(p.current_index + self.config.max_concurrent as usize).min(p.node_order.len());
for node_id in p.node_order[p.current_index..batch_end].iter() {
if let Some(info) = states.get_mut(node_id) {
info.state = NodeUpgradeState::Scheduled;
info.target_version = Some(p.target_version.clone());
}
}
Ok(())
}
pub fn fail_node_upgrade(&self, node_id: &str, error: &str) -> Result<()> {
let mut states = self.node_states.write();
let info = states
.get_mut(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
info.state = NodeUpgradeState::Failed;
info.error = Some(error.to_string());
warn!("Node {} upgrade failed: {}", node_id, error);
if self.config.auto_rollback {
drop(states);
self.initiate_rollback()?;
}
Ok(())
}
pub fn initiate_rollback(&self) -> Result<()> {
let mut plan = self.current_plan.write();
let p = plan.as_mut().ok_or(UpgradeError::NoUpgradeInProgress)?;
p.status = UpgradeStatus::RollingBack;
warn!("Initiating rollback for upgrade: {}", p.upgrade_id);
let mut states = self.node_states.write();
for (_, info) in states.iter_mut() {
if matches!(
info.state,
NodeUpgradeState::Scheduled
| NodeUpgradeState::Draining
| NodeUpgradeState::Upgrading
| NodeUpgradeState::Recovering
) {
info.state = NodeUpgradeState::RollingBack;
}
}
Ok(())
}
pub fn complete_rollback(&self, node_id: &str, previous_version: Version) -> Result<()> {
let mut states = self.node_states.write();
let info = states
.get_mut(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
info.state = NodeUpgradeState::Normal;
info.current_version = previous_version;
info.target_version = None;
info.error = None;
debug!("Node {} rollback complete", node_id);
let all_normal = states.values().all(|i| {
matches!(
i.state,
NodeUpgradeState::Normal | NodeUpgradeState::Completed | NodeUpgradeState::Failed
)
});
if all_normal {
drop(states);
let mut plan = self.current_plan.write();
if let Some(ref mut p) = *plan {
if p.status == UpgradeStatus::RollingBack {
p.status = UpgradeStatus::Failed;
p.completed_at = Some(current_time_ms());
info!("Rollback completed for upgrade: {}", p.upgrade_id);
}
}
}
Ok(())
}
pub fn get_plan(&self) -> Option<UpgradePlan> {
self.current_plan.read().clone()
}
pub fn get_node_status(&self, node_id: &str) -> Option<NodeUpgradeInfo> {
self.node_states.read().get(node_id).cloned()
}
pub fn get_stats(&self) -> UpgradeStats {
let states = self.node_states.read();
let mut upgraded_nodes = 0u32;
let mut failed_nodes = 0u32;
let mut pending_nodes = 0u32;
let mut currently_upgrading = 0u32;
let mut rollback_count = 0u32;
for info in states.values() {
match info.state {
NodeUpgradeState::Completed => upgraded_nodes += 1,
NodeUpgradeState::Failed => failed_nodes += 1,
NodeUpgradeState::Scheduled | NodeUpgradeState::Normal => pending_nodes += 1,
NodeUpgradeState::Draining
| NodeUpgradeState::Upgrading
| NodeUpgradeState::Recovering => {
currently_upgrading += 1;
}
NodeUpgradeState::RollingBack => rollback_count += 1,
}
}
UpgradeStats {
total_nodes: states.len() as u32,
upgraded_nodes,
failed_nodes,
pending_nodes,
currently_upgrading,
rollback_count,
}
}
pub fn can_proceed(&self) -> Result<bool> {
if self.paused.load(Ordering::SeqCst) {
return Err(UpgradeError::UpgradePaused);
}
let states = self.node_states.read();
let healthy_count = states
.values()
.filter(|i| {
matches!(
i.state,
NodeUpgradeState::Normal | NodeUpgradeState::Completed
)
})
.count() as u32;
if healthy_count < self.config.min_healthy_nodes {
return Err(UpgradeError::NotEnoughHealthyNodes {
have: healthy_count,
need: self.config.min_healthy_nodes,
});
}
Ok(true)
}
pub fn get_nodes_to_drain(&self) -> Vec<String> {
self.node_states
.read()
.iter()
.filter(|(_, i)| i.state == NodeUpgradeState::Scheduled)
.map(|(id, _)| id.clone())
.collect()
}
pub fn get_draining_nodes(&self) -> Vec<String> {
self.node_states
.read()
.iter()
.filter(|(_, i)| i.state == NodeUpgradeState::Draining)
.map(|(id, _)| id.clone())
.collect()
}
pub fn check_drain_timeout(&self, node_id: &str) -> Result<bool> {
let states = self.node_states.read();
let info = states
.get(node_id)
.ok_or_else(|| UpgradeError::NodeNotFound(node_id.to_string()))?;
if info.state != NodeUpgradeState::Draining {
return Ok(false);
}
if let Some(started) = info.started_at {
let elapsed = current_time_ms() - started;
if elapsed > self.config.drain_timeout_ms {
return Err(UpgradeError::DrainTimeout(node_id.to_string()));
}
}
Ok(false)
}
}
fn current_time_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
fn generate_upgrade_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("upgrade-{}", timestamp)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_parsing() {
let v = Version::parse("1.2.3").unwrap();
assert_eq!(v.major, 1);
assert_eq!(v.minor, 2);
assert_eq!(v.patch, 3);
assert_eq!(v.to_string(), "1.2.3");
}
#[test]
fn test_version_with_build() {
let v = Version::new(1, 2, 3).with_build("abc123");
assert_eq!(v.to_string(), "1.2.3+abc123");
}
#[test]
fn test_version_compatibility() {
let v1 = Version::new(1, 0, 0);
let v2 = Version::new(1, 1, 0);
let v3 = Version::new(2, 0, 0);
assert!(v1.is_compatible_with(&v2));
assert!(v2.is_compatible_with(&v1));
assert!(!v1.is_compatible_with(&v3));
}
#[test]
fn test_version_comparison() {
let v1 = Version::new(1, 0, 0);
let v2 = Version::new(1, 1, 0);
let v3 = Version::new(1, 1, 1);
let v4 = Version::new(2, 0, 0);
assert!(v2.is_newer_than(&v1));
assert!(v3.is_newer_than(&v2));
assert!(v4.is_newer_than(&v3));
assert!(!v1.is_newer_than(&v2));
}
#[test]
fn test_upgrade_config_defaults() {
let config = UpgradeConfig::default();
assert_eq!(config.drain_timeout_ms, 60000);
assert_eq!(config.health_check_interval_ms, 5000);
assert_eq!(config.required_health_checks, 3);
assert!(config.replicas_first);
assert_eq!(config.max_concurrent, 1);
assert!(config.auto_rollback);
}
#[test]
fn test_register_and_unregister_node() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
manager.register_node("node2", Version::new(1, 0, 0), true);
assert!(manager.get_node_status("node1").is_some());
assert!(manager.get_node_status("node2").is_some());
manager.unregister_node("node1");
assert!(manager.get_node_status("node1").is_none());
assert!(manager.get_node_status("node2").is_some());
}
#[test]
fn test_plan_upgrade() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("replica1", Version::new(1, 0, 0), false);
manager.register_node("replica2", Version::new(1, 0, 0), false);
manager.register_node("leader1", Version::new(1, 0, 0), true);
let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
assert_eq!(plan.target_version, Version::new(1, 1, 0));
assert_eq!(plan.node_order.len(), 3);
assert!(!plan.node_order[0].contains("leader"));
assert!(!plan.node_order[1].contains("leader"));
assert!(plan.node_order[2].contains("leader"));
}
#[test]
fn test_incompatible_version_rejected() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
let result = manager.plan_upgrade(Version::new(2, 0, 0));
assert!(matches!(
result,
Err(UpgradeError::IncompatibleVersion { .. })
));
}
#[test]
fn test_upgrade_lifecycle() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
assert_eq!(plan.status, UpgradeStatus::Planning);
manager.start_upgrade().unwrap();
let plan = manager.get_plan().unwrap();
assert_eq!(plan.status, UpgradeStatus::InProgress);
let info = manager.get_node_status("node1").unwrap();
assert_eq!(info.state, NodeUpgradeState::Scheduled);
}
#[test]
fn test_drain_complete_upgrade_cycle() {
let config = UpgradeConfig {
required_health_checks: 1,
..Default::default()
};
let manager = UpgradeManager::new(config);
manager.register_node("node1", Version::new(1, 0, 0), false);
manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
manager.start_upgrade().unwrap();
manager.start_drain("node1").unwrap();
assert_eq!(
manager.get_node_status("node1").unwrap().state,
NodeUpgradeState::Draining
);
manager.complete_drain("node1").unwrap();
assert_eq!(
manager.get_node_status("node1").unwrap().state,
NodeUpgradeState::Upgrading
);
manager
.complete_node_upgrade("node1", Version::new(1, 1, 0))
.unwrap();
assert_eq!(
manager.get_node_status("node1").unwrap().state,
NodeUpgradeState::Recovering
);
manager.record_health_check("node1", true).unwrap();
assert_eq!(
manager.get_node_status("node1").unwrap().state,
NodeUpgradeState::Completed
);
let plan = manager.get_plan().unwrap();
assert_eq!(plan.status, UpgradeStatus::Completed);
}
#[test]
fn test_pause_and_resume() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
manager.start_upgrade().unwrap();
manager.pause_upgrade().unwrap();
assert!(manager.is_paused());
assert_eq!(manager.get_plan().unwrap().status, UpgradeStatus::Paused);
manager.resume_upgrade().unwrap();
assert!(!manager.is_paused());
assert_eq!(
manager.get_plan().unwrap().status,
UpgradeStatus::InProgress
);
}
#[test]
fn test_upgrade_stats() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
manager.register_node("node2", Version::new(1, 0, 0), false);
manager.register_node("node3", Version::new(1, 0, 0), true);
let stats = manager.get_stats();
assert_eq!(stats.total_nodes, 3);
assert_eq!(stats.pending_nodes, 3);
assert_eq!(stats.upgraded_nodes, 0);
}
#[test]
fn test_rollback() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
manager.start_upgrade().unwrap();
manager.start_drain("node1").unwrap();
manager.initiate_rollback().unwrap();
let info = manager.get_node_status("node1").unwrap();
assert_eq!(info.state, NodeUpgradeState::RollingBack);
manager
.complete_rollback("node1", Version::new(1, 0, 0))
.unwrap();
let info = manager.get_node_status("node1").unwrap();
assert_eq!(info.state, NodeUpgradeState::Normal);
assert_eq!(info.current_version, Version::new(1, 0, 0));
}
#[test]
fn test_skip_already_upgraded_nodes() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 1, 0), false);
manager.register_node("node2", Version::new(1, 0, 0), false);
let plan = manager.plan_upgrade(Version::new(1, 1, 0)).unwrap();
assert_eq!(plan.node_order.len(), 1);
assert_eq!(plan.node_order[0], "node2");
}
#[test]
fn test_update_leader_status() {
let manager = UpgradeManager::new(UpgradeConfig::default());
manager.register_node("node1", Version::new(1, 0, 0), false);
assert!(!manager.get_node_status("node1").unwrap().is_leader);
manager.update_leader_status("node1", true);
assert!(manager.get_node_status("node1").unwrap().is_leader);
}
}