1use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct MultiClusterOrchestration {
12 pub name: String,
13 pub description: Option<String>,
14 pub clusters: Vec<ClusterTarget>,
15 pub synchronization: SyncMode,
16 pub orchestration: serde_json::Value,
17 pub failover_policy: FailoverPolicy,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ClusterTarget {
23 pub name: String,
24 pub context: String,
25 pub namespace: String,
26 pub region: Option<String>,
27 pub priority: u32,
28 pub enabled: bool,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33#[serde(rename_all = "lowercase")]
34pub enum SyncMode {
35 Parallel, Sequential, Rolling, Canary, }
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct FailoverPolicy {
44 pub enabled: bool,
45 pub max_failures: usize,
46 pub continue_on_cluster_failure: bool,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct MultiClusterStatus {
52 pub orchestration_name: String,
53 pub start_time: DateTime<Utc>,
54 pub end_time: Option<DateTime<Utc>>,
55 pub overall_status: ExecutionStatus,
56 pub cluster_statuses: HashMap<String, ClusterExecutionStatus>,
57 pub total_clusters: usize,
58 pub successful_clusters: usize,
59 pub failed_clusters: usize,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64#[serde(rename_all = "lowercase")]
65pub enum ExecutionStatus {
66 Pending,
67 Running,
68 Completed,
69 Failed,
70 PartialSuccess,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct ClusterExecutionStatus {
76 pub cluster_name: String,
77 pub status: ExecutionStatus,
78 pub start_time: Option<DateTime<Utc>>,
79 pub end_time: Option<DateTime<Utc>>,
80 pub progress: f64,
81 pub error: Option<String>,
82 pub metrics: ClusterMetrics,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, Default)]
87pub struct ClusterMetrics {
88 pub steps_completed: usize,
89 pub steps_total: usize,
90 pub failures: usize,
91 pub avg_latency_ms: f64,
92 pub error_rate: f64,
93}
94
95pub struct MultiClusterOrchestrator {
97 orchestrations: HashMap<String, MultiClusterOrchestration>,
98 statuses: HashMap<String, MultiClusterStatus>,
99}
100
101impl MultiClusterOrchestrator {
102 pub fn new() -> Self {
104 Self {
105 orchestrations: HashMap::new(),
106 statuses: HashMap::new(),
107 }
108 }
109
110 pub fn register(&mut self, orchestration: MultiClusterOrchestration) {
112 self.orchestrations.insert(orchestration.name.clone(), orchestration);
113 }
114
115 pub async fn execute(&mut self, name: &str) -> Result<MultiClusterStatus, String> {
117 let orchestration = self
118 .orchestrations
119 .get(name)
120 .ok_or_else(|| format!("Orchestration '{}' not found", name))?
121 .clone();
122
123 let start_time = Utc::now();
124 let mut cluster_statuses = HashMap::new();
125
126 for cluster in &orchestration.clusters {
128 if cluster.enabled {
129 cluster_statuses.insert(
130 cluster.name.clone(),
131 ClusterExecutionStatus {
132 cluster_name: cluster.name.clone(),
133 status: ExecutionStatus::Pending,
134 start_time: None,
135 end_time: None,
136 progress: 0.0,
137 error: None,
138 metrics: ClusterMetrics::default(),
139 },
140 );
141 }
142 }
143
144 let mut status = MultiClusterStatus {
145 orchestration_name: name.to_string(),
146 start_time,
147 end_time: None,
148 overall_status: ExecutionStatus::Running,
149 cluster_statuses: cluster_statuses.clone(),
150 total_clusters: orchestration.clusters.iter().filter(|c| c.enabled).count(),
151 successful_clusters: 0,
152 failed_clusters: 0,
153 };
154
155 match orchestration.synchronization {
157 SyncMode::Parallel => {
158 self.execute_parallel(&orchestration, &mut status).await?;
159 }
160 SyncMode::Sequential => {
161 self.execute_sequential(&orchestration, &mut status).await?;
162 }
163 SyncMode::Rolling => {
164 self.execute_rolling(&orchestration, &mut status).await?;
165 }
166 SyncMode::Canary => {
167 self.execute_canary(&orchestration, &mut status).await?;
168 }
169 }
170
171 status.end_time = Some(Utc::now());
172
173 if status.failed_clusters == 0 {
175 status.overall_status = ExecutionStatus::Completed;
176 } else if status.successful_clusters > 0 {
177 status.overall_status = ExecutionStatus::PartialSuccess;
178 } else {
179 status.overall_status = ExecutionStatus::Failed;
180 }
181
182 self.statuses.insert(name.to_string(), status.clone());
183
184 Ok(status)
185 }
186
187 async fn execute_parallel(
189 &self,
190 orchestration: &MultiClusterOrchestration,
191 status: &mut MultiClusterStatus,
192 ) -> Result<(), String> {
193 for cluster in &orchestration.clusters {
195 if !cluster.enabled {
196 continue;
197 }
198
199 match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
200 Ok(cluster_status) => {
201 status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
202 status.successful_clusters += 1;
203 }
204 Err(e) => {
205 if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
206 cluster_status.status = ExecutionStatus::Failed;
207 cluster_status.error = Some(e.clone());
208 }
209 status.failed_clusters += 1;
210
211 if !orchestration.failover_policy.continue_on_cluster_failure {
212 return Err(format!("Cluster {} failed: {}", cluster.name, e));
213 }
214 }
215 }
216 }
217
218 Ok(())
219 }
220
221 async fn execute_sequential(
223 &self,
224 orchestration: &MultiClusterOrchestration,
225 status: &mut MultiClusterStatus,
226 ) -> Result<(), String> {
227 for cluster in &orchestration.clusters {
228 if !cluster.enabled {
229 continue;
230 }
231
232 match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
233 Ok(cluster_status) => {
234 status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
235 status.successful_clusters += 1;
236 }
237 Err(e) => {
238 if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
239 cluster_status.status = ExecutionStatus::Failed;
240 cluster_status.error = Some(e.clone());
241 }
242 status.failed_clusters += 1;
243
244 if !orchestration.failover_policy.continue_on_cluster_failure {
245 return Err(format!("Cluster {} failed: {}", cluster.name, e));
246 }
247 }
248 }
249 }
250
251 Ok(())
252 }
253
254 async fn execute_rolling(
256 &self,
257 orchestration: &MultiClusterOrchestration,
258 status: &mut MultiClusterStatus,
259 ) -> Result<(), String> {
260 let window_size = 2;
262 let mut enabled_clusters: Vec<_> =
263 orchestration.clusters.iter().filter(|c| c.enabled).collect();
264
265 enabled_clusters.sort_by_key(|c| c.priority);
266
267 for window in enabled_clusters.chunks(window_size) {
268 for cluster in window {
269 match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
270 Ok(cluster_status) => {
271 status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
272 status.successful_clusters += 1;
273 }
274 Err(e) => {
275 if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name)
276 {
277 cluster_status.status = ExecutionStatus::Failed;
278 cluster_status.error = Some(e.clone());
279 }
280 status.failed_clusters += 1;
281
282 if !orchestration.failover_policy.continue_on_cluster_failure {
283 return Err(format!("Cluster {} failed: {}", cluster.name, e));
284 }
285 }
286 }
287 }
288
289 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
291 }
292
293 Ok(())
294 }
295
296 async fn execute_canary(
298 &self,
299 orchestration: &MultiClusterOrchestration,
300 status: &mut MultiClusterStatus,
301 ) -> Result<(), String> {
302 let mut enabled_clusters: Vec<_> =
304 orchestration.clusters.iter().filter(|c| c.enabled).collect();
305
306 enabled_clusters.sort_by_key(|c| std::cmp::Reverse(c.priority));
307
308 if enabled_clusters.is_empty() {
309 return Err("No enabled clusters".to_string());
310 }
311
312 let canary = enabled_clusters[0];
314 match self.execute_on_cluster(canary, &orchestration.orchestration).await {
315 Ok(cluster_status) => {
316 status.cluster_statuses.insert(canary.name.clone(), cluster_status);
317 status.successful_clusters += 1;
318 }
319 Err(e) => {
320 return Err(format!("Canary cluster {} failed: {}", canary.name, e));
321 }
322 }
323
324 for cluster in &enabled_clusters[1..] {
326 match self.execute_on_cluster(cluster, &orchestration.orchestration).await {
327 Ok(cluster_status) => {
328 status.cluster_statuses.insert(cluster.name.clone(), cluster_status);
329 status.successful_clusters += 1;
330 }
331 Err(e) => {
332 if let Some(cluster_status) = status.cluster_statuses.get_mut(&cluster.name) {
333 cluster_status.status = ExecutionStatus::Failed;
334 cluster_status.error = Some(e.clone());
335 }
336 status.failed_clusters += 1;
337
338 if !orchestration.failover_policy.continue_on_cluster_failure {
339 return Err(format!("Cluster {} failed: {}", cluster.name, e));
340 }
341 }
342 }
343 }
344
345 Ok(())
346 }
347
348 async fn execute_on_cluster(
350 &self,
351 cluster: &ClusterTarget,
352 _orchestration: &serde_json::Value,
353 ) -> Result<ClusterExecutionStatus, String> {
354 Ok(ClusterExecutionStatus {
362 cluster_name: cluster.name.clone(),
363 status: ExecutionStatus::Completed,
364 start_time: Some(Utc::now()),
365 end_time: Some(Utc::now()),
366 progress: 1.0,
367 error: None,
368 metrics: ClusterMetrics {
369 steps_completed: 5,
370 steps_total: 5,
371 failures: 0,
372 avg_latency_ms: 125.0,
373 error_rate: 0.0,
374 },
375 })
376 }
377
378 pub fn get_status(&self, name: &str) -> Option<&MultiClusterStatus> {
380 self.statuses.get(name)
381 }
382
383 pub fn list_orchestrations(&self) -> Vec<String> {
385 self.orchestrations.keys().cloned().collect()
386 }
387}
388
389impl Default for MultiClusterOrchestrator {
390 fn default() -> Self {
391 Self::new()
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_multi_cluster_orchestrator_creation() {
401 let orchestrator = MultiClusterOrchestrator::new();
402 assert_eq!(orchestrator.list_orchestrations().len(), 0);
403 }
404
405 #[test]
406 fn test_register_orchestration() {
407 let mut orchestrator = MultiClusterOrchestrator::new();
408
409 let orch = MultiClusterOrchestration {
410 name: "test-orch".to_string(),
411 description: Some("Test".to_string()),
412 clusters: vec![ClusterTarget {
413 name: "cluster-1".to_string(),
414 context: "kind-cluster-1".to_string(),
415 namespace: "default".to_string(),
416 region: Some("us-east-1".to_string()),
417 priority: 1,
418 enabled: true,
419 }],
420 synchronization: SyncMode::Parallel,
421 orchestration: serde_json::json!({}),
422 failover_policy: FailoverPolicy {
423 enabled: true,
424 max_failures: 1,
425 continue_on_cluster_failure: true,
426 },
427 };
428
429 orchestrator.register(orch);
430 assert_eq!(orchestrator.list_orchestrations().len(), 1);
431 }
432}