use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
use tracing::{info, warn};
use crate::error::{ClusterError, Result};
use crate::raft::OxirsNodeId;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Version {
pub major: u32,
pub minor: u32,
pub patch: u32,
}
impl Version {
pub fn new(major: u32, minor: u32, patch: u32) -> Self {
Self {
major,
minor,
patch,
}
}
pub fn parse(s: &str) -> Result<Self> {
let parts: Vec<&str> = s.split('.').collect();
if parts.len() != 3 {
return Err(ClusterError::Config(format!(
"Invalid version format: {}",
s
)));
}
let major = parts[0]
.parse()
.map_err(|_| ClusterError::Config(format!("Invalid major version: {}", parts[0])))?;
let minor = parts[1]
.parse()
.map_err(|_| ClusterError::Config(format!("Invalid minor version: {}", parts[1])))?;
let patch = parts[2]
.parse()
.map_err(|_| ClusterError::Config(format!("Invalid patch version: {}", parts[2])))?;
Ok(Version::new(major, minor, patch))
}
pub fn is_compatible_with(&self, other: &Version) -> bool {
self.major == other.major && *self >= *other
}
}
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpgradeStatus {
NotStarted,
Validating,
InProgress,
Completed,
Failed,
RolledBack,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpgradeStrategy {
AllAtOnce,
OneByOne,
Batched,
BlueGreen,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeUpgradeState {
pub node_id: OxirsNodeId,
pub current_version: Version,
pub target_version: Option<Version>,
pub status: UpgradeStatus,
pub is_leader: bool,
pub last_health_check: Option<SystemTime>,
pub upgrade_started_at: Option<SystemTime>,
pub upgrade_completed_at: Option<SystemTime>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RollingUpgradeConfig {
pub strategy: UpgradeStrategy,
pub batch_size: usize,
pub batch_wait_secs: u64,
pub health_check_interval_secs: u64,
pub max_upgrade_duration_secs: u64,
pub auto_rollback: bool,
pub min_healthy_nodes: usize,
pub leader_last: bool,
}
impl Default for RollingUpgradeConfig {
fn default() -> Self {
Self {
strategy: UpgradeStrategy::OneByOne,
batch_size: 2,
batch_wait_secs: 60,
health_check_interval_secs: 10,
max_upgrade_duration_secs: 600, auto_rollback: true,
min_healthy_nodes: 1,
leader_last: true,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UpgradeStatistics {
pub total_upgrades: u64,
pub successful_upgrades: u64,
pub failed_upgrades: u64,
pub rollbacks_performed: u64,
pub avg_upgrade_duration_secs: f64,
pub last_upgrade: Option<SystemTime>,
}
pub struct RollingUpgradeManager {
config: RollingUpgradeConfig,
nodes: Arc<RwLock<HashMap<OxirsNodeId, NodeUpgradeState>>>,
upgrade_status: Arc<RwLock<UpgradeStatus>>,
target_version: Arc<RwLock<Option<Version>>>,
stats: Arc<RwLock<UpgradeStatistics>>,
upgrade_start_time: Arc<RwLock<Option<SystemTime>>>,
}
impl RollingUpgradeManager {
pub fn new(config: RollingUpgradeConfig) -> Self {
Self {
config,
nodes: Arc::new(RwLock::new(HashMap::new())),
upgrade_status: Arc::new(RwLock::new(UpgradeStatus::NotStarted)),
target_version: Arc::new(RwLock::new(None)),
stats: Arc::new(RwLock::new(UpgradeStatistics::default())),
upgrade_start_time: Arc::new(RwLock::new(None)),
}
}
pub async fn register_node(&self, node_id: OxirsNodeId, version: Version, is_leader: bool) {
let mut nodes = self.nodes.write().await;
nodes.insert(
node_id,
NodeUpgradeState {
node_id,
current_version: version,
target_version: None,
status: UpgradeStatus::NotStarted,
is_leader,
last_health_check: Some(SystemTime::now()),
upgrade_started_at: None,
upgrade_completed_at: None,
error: None,
},
);
info!("Registered node {} for upgrade management", node_id);
}
pub async fn unregister_node(&self, node_id: OxirsNodeId) {
let mut nodes = self.nodes.write().await;
nodes.remove(&node_id);
info!("Unregistered node {} from upgrade management", node_id);
}
pub async fn start_upgrade(&self, target_version: Version) -> Result<()> {
{
let status = self.upgrade_status.read().await;
if *status == UpgradeStatus::InProgress {
return Err(ClusterError::Other(
"Upgrade already in progress".to_string(),
));
}
}
info!("Starting rolling upgrade to version {}", target_version);
self.validate_upgrade(&target_version).await?;
{
let mut status = self.upgrade_status.write().await;
*status = UpgradeStatus::InProgress;
}
{
let mut target = self.target_version.write().await;
*target = Some(target_version.clone());
}
{
let mut start_time = self.upgrade_start_time.write().await;
*start_time = Some(SystemTime::now());
}
{
let mut nodes = self.nodes.write().await;
for node in nodes.values_mut() {
node.target_version = Some(target_version.clone());
node.status = UpgradeStatus::NotStarted;
}
}
Ok(())
}
pub async fn execute_upgrade(&self) -> Result<()> {
let upgrade_order = self.determine_upgrade_order().await;
match self.config.strategy {
UpgradeStrategy::AllAtOnce => {
self.upgrade_all_at_once(&upgrade_order).await?;
}
UpgradeStrategy::OneByOne => {
self.upgrade_one_by_one(&upgrade_order).await?;
}
UpgradeStrategy::Batched => {
self.upgrade_batched(&upgrade_order).await?;
}
UpgradeStrategy::BlueGreen => {
self.upgrade_blue_green(&upgrade_order).await?;
}
}
{
let mut status = self.upgrade_status.write().await;
*status = UpgradeStatus::Completed;
}
self.update_upgrade_statistics(true).await;
info!("Rolling upgrade completed successfully");
Ok(())
}
pub async fn rollback_upgrade(&self) -> Result<()> {
warn!("Rolling back upgrade");
{
let mut status = self.upgrade_status.write().await;
*status = UpgradeStatus::RolledBack;
}
{
let mut stats = self.stats.write().await;
stats.rollbacks_performed += 1;
}
Ok(())
}
pub async fn get_status(&self) -> UpgradeStatus {
*self.upgrade_status.read().await
}
pub async fn get_node_states(&self) -> Vec<NodeUpgradeState> {
let nodes = self.nodes.read().await;
nodes.values().cloned().collect()
}
pub async fn get_statistics(&self) -> UpgradeStatistics {
self.stats.read().await.clone()
}
async fn validate_upgrade(&self, target_version: &Version) -> Result<()> {
let nodes = self.nodes.read().await;
if nodes.is_empty() {
return Err(ClusterError::Config("No nodes registered".to_string()));
}
for node in nodes.values() {
if !target_version.is_compatible_with(&node.current_version) {
return Err(ClusterError::Config(format!(
"Version {} is not compatible with current version {} on node {}",
target_version, node.current_version, node.node_id
)));
}
}
if nodes.len() < self.config.min_healthy_nodes {
return Err(ClusterError::Config(format!(
"Not enough healthy nodes for upgrade: {} < {}",
nodes.len(),
self.config.min_healthy_nodes
)));
}
info!("Upgrade validation passed for version {}", target_version);
Ok(())
}
async fn determine_upgrade_order(&self) -> Vec<OxirsNodeId> {
let nodes = self.nodes.read().await;
let mut order: Vec<_> = nodes.keys().copied().collect();
if self.config.leader_last {
order.sort_by_key(|id| {
let node = nodes.get(id).expect("node should exist in nodes map");
if node.is_leader {
1
} else {
0
}
});
}
order
}
async fn upgrade_all_at_once(&self, order: &[OxirsNodeId]) -> Result<()> {
info!("Upgrading all {} nodes at once", order.len());
for &node_id in order {
self.upgrade_node(node_id).await?;
}
Ok(())
}
async fn upgrade_one_by_one(&self, order: &[OxirsNodeId]) -> Result<()> {
info!("Upgrading {} nodes one by one", order.len());
for &node_id in order {
self.upgrade_node(node_id).await?;
tokio::time::sleep(Duration::from_secs(self.config.batch_wait_secs)).await;
}
Ok(())
}
async fn upgrade_batched(&self, order: &[OxirsNodeId]) -> Result<()> {
info!(
"Upgrading {} nodes in batches of {}",
order.len(),
self.config.batch_size
);
for batch in order.chunks(self.config.batch_size) {
for &node_id in batch {
self.upgrade_node(node_id).await?;
}
tokio::time::sleep(Duration::from_secs(self.config.batch_wait_secs)).await;
}
Ok(())
}
async fn upgrade_blue_green(&self, order: &[OxirsNodeId]) -> Result<()> {
info!("Performing blue-green deployment for {} nodes", order.len());
self.upgrade_one_by_one(order).await
}
async fn upgrade_node(&self, node_id: OxirsNodeId) -> Result<()> {
info!("Upgrading node {}", node_id);
{
let mut nodes = self.nodes.write().await;
if let Some(node) = nodes.get_mut(&node_id) {
node.status = UpgradeStatus::InProgress;
node.upgrade_started_at = Some(SystemTime::now());
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
{
let mut nodes = self.nodes.write().await;
if let Some(node) = nodes.get_mut(&node_id) {
node.status = UpgradeStatus::Completed;
node.upgrade_completed_at = Some(SystemTime::now());
if let Some(target) = &node.target_version {
node.current_version = target.clone();
}
}
}
info!("Node {} upgraded successfully", node_id);
Ok(())
}
async fn update_upgrade_statistics(&self, success: bool) {
let mut stats = self.stats.write().await;
stats.total_upgrades += 1;
if success {
stats.successful_upgrades += 1;
} else {
stats.failed_upgrades += 1;
}
stats.last_upgrade = Some(SystemTime::now());
if let Some(start_time) = *self.upgrade_start_time.read().await {
if let Ok(duration) = SystemTime::now().duration_since(start_time) {
let duration_secs = duration.as_secs() as f64;
if stats.total_upgrades > 1 {
stats.avg_upgrade_duration_secs = (stats.avg_upgrade_duration_secs
* (stats.total_upgrades - 1) as f64
+ duration_secs)
/ stats.total_upgrades as f64;
} else {
stats.avg_upgrade_duration_secs = duration_secs;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_parsing() {
let v = Version::parse("1.2.3").unwrap();
assert_eq!(v.major, 1);
assert_eq!(v.minor, 2);
assert_eq!(v.patch, 3);
}
#[test]
fn test_version_compatibility() {
let v1 = Version::new(1, 2, 3);
let v2 = Version::new(1, 3, 0);
let v3 = Version::new(2, 0, 0);
assert!(v2.is_compatible_with(&v1));
assert!(!v1.is_compatible_with(&v2));
assert!(!v3.is_compatible_with(&v1));
}
#[tokio::test]
async fn test_rolling_upgrade_manager_creation() {
let config = RollingUpgradeConfig::default();
let manager = RollingUpgradeManager::new(config);
let status = manager.get_status().await;
assert_eq!(status, UpgradeStatus::NotStarted);
}
#[tokio::test]
async fn test_register_node() {
let config = RollingUpgradeConfig::default();
let manager = RollingUpgradeManager::new(config);
let version = Version::new(1, 0, 0);
manager.register_node(1, version, false).await;
let states = manager.get_node_states().await;
assert_eq!(states.len(), 1);
assert_eq!(states[0].node_id, 1);
}
#[tokio::test]
async fn test_start_upgrade() {
let config = RollingUpgradeConfig::default();
let manager = RollingUpgradeManager::new(config);
let current_version = Version::new(1, 0, 0);
let target_version = Version::new(1, 1, 0);
manager.register_node(1, current_version, false).await;
manager.register_node(2, Version::new(1, 0, 0), false).await;
let result = manager.start_upgrade(target_version).await;
assert!(result.is_ok());
let status = manager.get_status().await;
assert_eq!(status, UpgradeStatus::InProgress);
}
#[tokio::test]
async fn test_execute_one_by_one_upgrade() {
let config = RollingUpgradeConfig {
strategy: UpgradeStrategy::OneByOne,
batch_wait_secs: 0, ..Default::default()
};
let manager = RollingUpgradeManager::new(config);
manager.register_node(1, Version::new(1, 0, 0), false).await;
manager.register_node(2, Version::new(1, 0, 0), false).await;
manager.start_upgrade(Version::new(1, 1, 0)).await.unwrap();
manager.execute_upgrade().await.unwrap();
let status = manager.get_status().await;
assert_eq!(status, UpgradeStatus::Completed);
}
#[tokio::test]
async fn test_upgrade_statistics() {
let config = RollingUpgradeConfig {
batch_wait_secs: 0, ..Default::default()
};
let manager = RollingUpgradeManager::new(config);
manager.register_node(1, Version::new(1, 0, 0), false).await;
manager.start_upgrade(Version::new(1, 1, 0)).await.unwrap();
manager.execute_upgrade().await.unwrap();
let stats = manager.get_statistics().await;
assert_eq!(stats.total_upgrades, 1);
assert_eq!(stats.successful_upgrades, 1);
}
}