mockforge_chaos/
multi_cluster.rs

1//! Multi-cluster orchestration support
2//!
3//! Execute chaos orchestrations across multiple Kubernetes clusters simultaneously.
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Multi-cluster orchestration configuration
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct MultiClusterOrchestration {
12    pub name: String,
13    pub description: Option<String>,
14    pub clusters: Vec<ClusterTarget>,
15    pub synchronization: SyncMode,
16    pub orchestration: serde_json::Value,
17    pub failover_policy: FailoverPolicy,
18}
19
20/// Target cluster configuration
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ClusterTarget {
23    pub name: String,
24    pub context: String,
25    pub namespace: String,
26    pub region: Option<String>,
27    pub priority: u32,
28    pub enabled: bool,
29}
30
31/// Synchronization mode for multi-cluster execution
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33#[serde(rename_all = "lowercase")]
34pub enum SyncMode {
35    Parallel,   // Execute on all clusters simultaneously
36    Sequential, // Execute one cluster at a time
37    Rolling,    // Rolling execution with configurable window
38    Canary,     // Execute on canary cluster first, then others
39}
40
41/// Failover policy
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct FailoverPolicy {
44    pub enabled: bool,
45    pub max_failures: usize,
46    pub continue_on_cluster_failure: bool,
47}
48
49/// Multi-cluster execution status
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct MultiClusterStatus {
52    pub orchestration_name: String,
53    pub start_time: DateTime<Utc>,
54    pub end_time: Option<DateTime<Utc>>,
55    pub overall_status: ExecutionStatus,
56    pub cluster_statuses: HashMap<String, ClusterExecutionStatus>,
57    pub total_clusters: usize,
58    pub successful_clusters: usize,
59    pub failed_clusters: usize,
60}
61
62/// Execution status
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64#[serde(rename_all = "lowercase")]
65pub enum ExecutionStatus {
66    Pending,
67    Running,
68    Completed,
69    Failed,
70    PartialSuccess,
71}
72
73/// Status for individual cluster
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct ClusterExecutionStatus {
76    pub cluster_name: String,
77    pub status: ExecutionStatus,
78    pub start_time: Option<DateTime<Utc>>,
79    pub end_time: Option<DateTime<Utc>>,
80    pub progress: f64,
81    pub error: Option<String>,
82    pub metrics: ClusterMetrics,
83}
84
85/// Metrics for cluster execution
86#[derive(Debug, Clone, Serialize, Deserialize, Default)]
87pub struct ClusterMetrics {
88    pub steps_completed: usize,
89    pub steps_total: usize,
90    pub failures: usize,
91    pub avg_latency_ms: f64,
92    pub error_rate: f64,
93}
94
95/// Multi-cluster orchestrator
96pub struct MultiClusterOrchestrator {
97    orchestrations: HashMap<String, MultiClusterOrchestration>,
98    statuses: HashMap<String, MultiClusterStatus>,
99}
100
101impl MultiClusterOrchestrator {
102    /// Create a new multi-cluster orchestrator
103    pub fn new() -> Self {
104        Self {
105            orchestrations: HashMap::new(),
106            statuses: HashMap::new(),
107        }
108    }
109
110    /// Register a multi-cluster orchestration
111    pub fn register(&mut self, orchestration: MultiClusterOrchestration) {
112        self.orchestrations.insert(orchestration.name.clone(), orchestration);
113    }
114
115    /// Execute a multi-cluster orchestration
116    pub async fn execute(&mut self, name: &str) -> Result<MultiClusterStatus, String> {
117        let orchestration = self
118            .orchestrations
119            .get(name)
120            .ok_or_else(|| format!("Orchestration '{}' not found", name))?
121            .clone();
122
123        let start_time = Utc::now();
124        let mut cluster_statuses = HashMap::new();
125
126        // Initialize cluster statuses
127        for cluster in &orchestration.clusters {
128            if cluster.enabled {
129                cluster_statuses.insert(
130                    cluster.name.clone(),
131                    ClusterExecutionStatus {
132                        cluster_name: cluster.name.clone(),
133                        status: ExecutionStatus::Pending,
134                        start_time: None,
135                        end_time: None,
136                        progress: 0.0,
137                        error: None,
138                        metrics: ClusterMetrics::default(),
139                    },
140                );
141            }
142        }
143
144        let mut status = MultiClusterStatus {
145            orchestration_name: name.to_string(),
146            start_time,
147            end_time: None,
148            overall_status: ExecutionStatus::Running,
149            cluster_statuses: cluster_statuses.clone(),
150            total_clusters: orchestration.clusters.iter().filter(|c| c.enabled).count(),
151            successful_clusters: 0,
152            failed_clusters: 0,
153        };
154
155        // Execute based on synchronization mode
156        match orchestration.synchronization {
157            SyncMode::Parallel => {
158                self.execute_parallel(&orchestration, &mut status).await?;
159            }
160            SyncMode::Sequential => {
161                self.execute_sequential(&orchestration, &mut status).await?;
162            }
163            SyncMode::Rolling => {
164                self.execute_rolling(&orchestration, &mut status).await?;
165            }
166            SyncMode::Canary => {
167                self.execute_canary(&orchestration, &mut status).await?;
168            }
169        }
170
171        status.end_time = Some(Utc::now());
172
173        // Determine overall status
174        if status.failed_clusters == 0 {
175            status.overall_status = ExecutionStatus::Completed;
176        } else if status.successful_clusters > 0 {
177            status.overall_status = ExecutionStatus::PartialSuccess;
178        } else {
179            status.overall_status = ExecutionStatus::Failed;
180        }
181
182        self.statuses.insert(name.to_string(), status.clone());
183
184        Ok(status)
185    }
186
187    /// Execute on all clusters in parallel
188    async fn execute_parallel(
189        &self,
190        orchestration: &MultiClusterOrchestration,
191        status: &mut MultiClusterStatus,
192    ) -> Result<(), String> {
193        // In real implementation, spawn tasks for each cluster
194        for cluster in &orchestration.clusters {
195            if !cluster.enabled {
196                continue;
197            }
198
199            match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
200                Ok(cluster_status) => {
201                    status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
202                    status.successful_clusters += 1;
203                }
204                Err(e) => {
205                    if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
206                        cluster_status.status = ExecutionStatus::Failed;
207                        cluster_status.error = Some(e.clone());
208                    }
209                    status.failed_clusters += 1;
210
211                    if !orchestration.failover_policy.continue_on_cluster_failure {
212                        return Err(format!("Cluster {} failed: {}", cluster.name, e));
213                    }
214                }
215            }
216        }
217
218        Ok(())
219    }
220
221    /// Execute on clusters sequentially
222    async fn execute_sequential(
223        &self,
224        orchestration: &MultiClusterOrchestration,
225        status: &mut MultiClusterStatus,
226    ) -> Result<(), String> {
227        for cluster in &orchestration.clusters {
228            if !cluster.enabled {
229                continue;
230            }
231
232            match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
233                Ok(cluster_status) => {
234                    status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
235                    status.successful_clusters += 1;
236                }
237                Err(e) => {
238                    if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
239                        cluster_status.status = ExecutionStatus::Failed;
240                        cluster_status.error = Some(e.clone());
241                    }
242                    status.failed_clusters += 1;
243
244                    if !orchestration.failover_policy.continue_on_cluster_failure {
245                        return Err(format!("Cluster {} failed: {}", cluster.name, e));
246                    }
247                }
248            }
249        }
250
251        Ok(())
252    }
253
254    /// Execute with rolling deployment
255    async fn execute_rolling(
256        &self,
257        orchestration: &MultiClusterOrchestration,
258        status: &mut MultiClusterStatus,
259    ) -> Result<(), String> {
260        // Rolling execution with window size of 1-2 clusters at a time
261        let window_size = 2;
262        let mut enabled_clusters: Vec<_> =
263            orchestration.clusters.iter().filter(|c| c.enabled).collect();
264
265        enabled_clusters.sort_by_key(|c| c.priority);
266
267        for window in enabled_clusters.chunks(window_size) {
268            for cluster in window {
269                match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
270                    Ok(cluster_status) => {
271                        status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
272                        status.successful_clusters += 1;
273                    }
274                    Err(e) => {
275                        if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name)
276                        {
277                            cluster_status.status = ExecutionStatus::Failed;
278                            cluster_status.error = Some(e.clone());
279                        }
280                        status.failed_clusters += 1;
281
282                        if !orchestration.failover_policy.continue_on_cluster_failure {
283                            return Err(format!("Cluster {} failed: {}", cluster.name, e));
284                        }
285                    }
286                }
287            }
288
289            // Small delay between windows
290            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
291        }
292
293        Ok(())
294    }
295
296    /// Execute with canary strategy
297    async fn execute_canary(
298        &self,
299        orchestration: &MultiClusterOrchestration,
300        status: &mut MultiClusterStatus,
301    ) -> Result<(), String> {
302        // Find canary cluster (highest priority)
303        let mut enabled_clusters: Vec<_> =
304            orchestration.clusters.iter().filter(|c| c.enabled).collect();
305
306        enabled_clusters.sort_by_key(|c| std::cmp::Reverse(c.priority));
307
308        if enabled_clusters.is_empty() {
309            return Err("No enabled clusters".to_string());
310        }
311
312        // Execute on canary first
313        let canary = enabled_clusters[0];
314        match self.execute_on_cluster(canary, &orchestration.orchestration).await {
315            Ok(cluster_status) => {
316                status.cluster_statuses.insert(canary.name.clone(), cluster_status);
317                status.successful_clusters += 1;
318            }
319            Err(e) => {
320                return Err(format!("Canary cluster {} failed: {}", canary.name, e));
321            }
322        }
323
324        // If canary succeeded, execute on remaining clusters
325        for cluster in &enabled_clusters[1..] {
326            match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
327                Ok(cluster_status) => {
328                    status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
329                    status.successful_clusters += 1;
330                }
331                Err(e) => {
332                    if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
333                        cluster_status.status = ExecutionStatus::Failed;
334                        cluster_status.error = Some(e.clone());
335                    }
336                    status.failed_clusters += 1;
337
338                    if !orchestration.failover_policy.continue_on_cluster_failure {
339                        return Err(format!("Cluster {} failed: {}", cluster.name, e));
340                    }
341                }
342            }
343        }
344
345        Ok(())
346    }
347
348    /// Execute orchestration on a single cluster
349    async fn execute_on_cluster(
350        &self,
351        cluster: &ClusterTarget,
352        _orchestration: &serde_json::Value,
353    ) -> Result<ClusterExecutionStatus, String> {
354        // In real implementation:
355        // 1. Connect to cluster using context
356        // 2. Deploy orchestration
357        // 3. Monitor execution
358        // 4. Collect metrics
359
360        // Simulated execution
361        Ok(ClusterExecutionStatus {
362            cluster_name: cluster.name.clone(),
363            status: ExecutionStatus::Completed,
364            start_time: Some(Utc::now()),
365            end_time: Some(Utc::now()),
366            progress: 1.0,
367            error: None,
368            metrics: ClusterMetrics {
369                steps_completed: 5,
370                steps_total: 5,
371                failures: 0,
372                avg_latency_ms: 125.0,
373                error_rate: 0.0,
374            },
375        })
376    }
377
378    /// Get status of a multi-cluster orchestration
379    pub fn get_status(&self, name: &str) -> Option<&MultiClusterStatus> {
380        self.statuses.get(name)
381    }
382
383    /// List all registered orchestrations
384    pub fn list_orchestrations(&self) -> Vec<String> {
385        self.orchestrations.keys().cloned().collect()
386    }
387}
388
389impl Default for MultiClusterOrchestrator {
390    fn default() -> Self {
391        Self::new()
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn test_multi_cluster_orchestrator_creation() {
401        let orchestrator = MultiClusterOrchestrator::new();
402        assert_eq!(orchestrator.list_orchestrations().len(), 0);
403    }
404
405    #[test]
406    fn test_register_orchestration() {
407        let mut orchestrator = MultiClusterOrchestrator::new();
408
409        let orch = MultiClusterOrchestration {
410            name: "test-orch".to_string(),
411            description: Some("Test".to_string()),
412            clusters: vec![ClusterTarget {
413                name: "cluster-1".to_string(),
414                context: "kind-cluster-1".to_string(),
415                namespace: "default".to_string(),
416                region: Some("us-east-1".to_string()),
417                priority: 1,
418                enabled: true,
419            }],
420            synchronization: SyncMode::Parallel,
421            orchestration: serde_json::json!({}),
422            failover_policy: FailoverPolicy {
423                enabled: true,
424                max_failures: 1,
425                continue_on_cluster_failure: true,
426            },
427        };
428
429        orchestrator.register(orch);
430        assert_eq!(orchestrator.list_orchestrations().len(), 1);
431    }
432}