use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiClusterOrchestration {
pub name: String,
pub description: Option<String>,
pub clusters: Vec<ClusterTarget>,
pub synchronization: SyncMode,
pub orchestration: serde_json::Value,
pub failover_policy: FailoverPolicy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterTarget {
pub name: String,
pub context: String,
pub namespace: String,
pub region: Option<String>,
pub priority: u32,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum SyncMode {
Parallel, Sequential, Rolling, Canary, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailoverPolicy {
pub enabled: bool,
pub max_failures: usize,
pub continue_on_cluster_failure: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiClusterStatus {
pub orchestration_name: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub overall_status: ExecutionStatus,
pub cluster_statuses: HashMap<String, ClusterExecutionStatus>,
pub total_clusters: usize,
pub successful_clusters: usize,
pub failed_clusters: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum ExecutionStatus {
Pending,
Running,
Completed,
Failed,
PartialSuccess,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterExecutionStatus {
pub cluster_name: String,
pub status: ExecutionStatus,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub progress: f64,
pub error: Option<String>,
pub metrics: ClusterMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClusterMetrics {
pub steps_completed: usize,
pub steps_total: usize,
pub failures: usize,
pub avg_latency_ms: f64,
pub error_rate: f64,
}
pub struct MultiClusterOrchestrator {
orchestrations: HashMap<String, MultiClusterOrchestration>,
statuses: HashMap<String, MultiClusterStatus>,
}
impl MultiClusterOrchestrator {
pub fn new() -> Self {
Self {
orchestrations: HashMap::new(),
statuses: HashMap::new(),
}
}
pub fn register(&mut self, orchestration: MultiClusterOrchestration) {
self.orchestrations.insert(orchestration.name.clone(), orchestration);
}
pub async fn execute(&mut self, name: &str) -> Result<MultiClusterStatus, String> {
let orchestration = self
.orchestrations
.get(name)
.ok_or_else(|| format!("Orchestration '{}' not found", name))?
.clone();
let start_time = Utc::now();
let mut cluster_statuses = HashMap::new();
for cluster in &orchestration.clusters {
if cluster.enabled {
cluster_statuses.insert(
cluster.name.clone(),
ClusterExecutionStatus {
cluster_name: cluster.name.clone(),
status: ExecutionStatus::Pending,
start_time: None,
end_time: None,
progress: 0.0,
error: None,
metrics: ClusterMetrics::default(),
},
);
}
}
let mut status = MultiClusterStatus {
orchestration_name: name.to_string(),
start_time,
end_time: None,
overall_status: ExecutionStatus::Running,
cluster_statuses: cluster_statuses.clone(),
total_clusters: orchestration.clusters.iter().filter(|c| c.enabled).count(),
successful_clusters: 0,
failed_clusters: 0,
};
match orchestration.synchronization {
SyncMode::Parallel => {
self.execute_parallel(&orchestration, &mut status).await?;
}
SyncMode::Sequential => {
self.execute_sequential(&orchestration, &mut status).await?;
}
SyncMode::Rolling => {
self.execute_rolling(&orchestration, &mut status).await?;
}
SyncMode::Canary => {
self.execute_canary(&orchestration, &mut status).await?;
}
}
status.end_time = Some(Utc::now());
if status.failed_clusters == 0 {
status.overall_status = ExecutionStatus::Completed;
} else if status.successful_clusters > 0 {
status.overall_status = ExecutionStatus::PartialSuccess;
} else {
status.overall_status = ExecutionStatus::Failed;
}
self.statuses.insert(name.to_string(), status.clone());
Ok(status)
}
async fn execute_parallel(
&self,
orchestration: &MultiClusterOrchestration,
status: &mut MultiClusterStatus,
) -> Result<(), String> {
for cluster in &orchestration.clusters {
if !cluster.enabled {
continue;
}
match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
Ok(cluster_status) => {
status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
status.successful_clusters += 1;
}
Err(e) => {
if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
cluster_status.status = ExecutionStatus::Failed;
cluster_status.error = Some(e.clone());
}
status.failed_clusters += 1;
if !orchestration.failover_policy.continue_on_cluster_failure {
return Err(format!("Cluster {} failed: {}", cluster.name, e));
}
}
}
}
Ok(())
}
async fn execute_sequential(
&self,
orchestration: &MultiClusterOrchestration,
status: &mut MultiClusterStatus,
) -> Result<(), String> {
for cluster in &orchestration.clusters {
if !cluster.enabled {
continue;
}
match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
Ok(cluster_status) => {
status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
status.successful_clusters += 1;
}
Err(e) => {
if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
cluster_status.status = ExecutionStatus::Failed;
cluster_status.error = Some(e.clone());
}
status.failed_clusters += 1;
if !orchestration.failover_policy.continue_on_cluster_failure {
return Err(format!("Cluster {} failed: {}", cluster.name, e));
}
}
}
}
Ok(())
}
async fn execute_rolling(
&self,
orchestration: &MultiClusterOrchestration,
status: &mut MultiClusterStatus,
) -> Result<(), String> {
let window_size = 2;
let mut enabled_clusters: Vec<_> =
orchestration.clusters.iter().filter(|c| c.enabled).collect();
enabled_clusters.sort_by_key(|c| c.priority);
for window in enabled_clusters.chunks(window_size) {
for cluster in window {
match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
Ok(cluster_status) => {
status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
status.successful_clusters += 1;
}
Err(e) => {
if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name)
{
cluster_status.status = ExecutionStatus::Failed;
cluster_status.error = Some(e.clone());
}
status.failed_clusters += 1;
if !orchestration.failover_policy.continue_on_cluster_failure {
return Err(format!("Cluster {} failed: {}", cluster.name, e));
}
}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Ok(())
}
async fn execute_canary(
&self,
orchestration: &MultiClusterOrchestration,
status: &mut MultiClusterStatus,
) -> Result<(), String> {
let mut enabled_clusters: Vec<_> =
orchestration.clusters.iter().filter(|c| c.enabled).collect();
enabled_clusters.sort_by_key(|c| std::cmp::Reverse(c.priority));
if enabled_clusters.is_empty() {
return Err("No enabled clusters".to_string());
}
let canary = enabled_clusters[0];
match self.execute_on_cluster(canary, &orchestration.orchestration).await {
Ok(cluster_status) => {
status.cluster_statuses.insert(canary.name.clone(), cluster_status);
status.successful_clusters += 1;
}
Err(e) => {
return Err(format!("Canary cluster {} failed: {}", canary.name, e));
}
}
for cluster in &enabled_clusters[1..] {
match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
Ok(cluster_status) => {
status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
status.successful_clusters += 1;
}
Err(e) => {
if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
cluster_status.status = ExecutionStatus::Failed;
cluster_status.error = Some(e.clone());
}
status.failed_clusters += 1;
if !orchestration.failover_policy.continue_on_cluster_failure {
return Err(format!("Cluster {} failed: {}", cluster.name, e));
}
}
}
}
Ok(())
}
async fn execute_on_cluster(
&self,
cluster: &ClusterTarget,
_orchestration: &serde_json::Value,
) -> Result<ClusterExecutionStatus, String> {
Ok(ClusterExecutionStatus {
cluster_name: cluster.name.clone(),
status: ExecutionStatus::Completed,
start_time: Some(Utc::now()),
end_time: Some(Utc::now()),
progress: 1.0,
error: None,
metrics: ClusterMetrics {
steps_completed: 5,
steps_total: 5,
failures: 0,
avg_latency_ms: 125.0,
error_rate: 0.0,
},
})
}
pub fn get_status(&self, name: &str) -> Option<&MultiClusterStatus> {
self.statuses.get(name)
}
pub fn list_orchestrations(&self) -> Vec<String> {
self.orchestrations.keys().cloned().collect()
}
}
impl Default for MultiClusterOrchestrator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multi_cluster_orchestrator_creation() {
let orchestrator = MultiClusterOrchestrator::new();
assert_eq!(orchestrator.list_orchestrations().len(), 0);
}
#[test]
fn test_register_orchestration() {
let mut orchestrator = MultiClusterOrchestrator::new();
let orch = MultiClusterOrchestration {
name: "test-orch".to_string(),
description: Some("Test".to_string()),
clusters: vec![ClusterTarget {
name: "cluster-1".to_string(),
context: "kind-cluster-1".to_string(),
namespace: "default".to_string(),
region: Some("us-east-1".to_string()),
priority: 1,
enabled: true,
}],
synchronization: SyncMode::Parallel,
orchestration: serde_json::json!({}),
failover_policy: FailoverPolicy {
enabled: true,
max_failures: 1,
continue_on_cluster_failure: true,
},
};
orchestrator.register(orch);
assert_eq!(orchestrator.list_orchestrations().len(), 1);
}
}