mockforge_chaos/
chaos_mesh.rs

1//! Chaos Mesh Integration
2//!
3//! Integrates MockForge with Chaos Mesh for Kubernetes-native chaos engineering.
4//! Supports various chaos experiment types including PodChaos, NetworkChaos, StressChaos, etc.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use thiserror::Error;
9
10/// Chaos Mesh integration errors
11#[derive(Error, Debug)]
12pub enum ChaosMeshError {
13    #[error("Kubernetes API error: {0}")]
14    KubernetesError(String),
15
16    #[error("Experiment not found: {0}")]
17    ExperimentNotFound(String),
18
19    #[error("Invalid experiment configuration: {0}")]
20    InvalidConfig(String),
21
22    #[error("HTTP request error: {0}")]
23    RequestError(#[from] reqwest::Error),
24
25    #[error("Serialization error: {0}")]
26    SerializationError(#[from] serde_json::Error),
27}
28
29pub type Result<T> = std::result::Result<T, ChaosMeshError>;
30
31/// Chaos Mesh experiment types
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33#[serde(rename_all = "lowercase")]
34pub enum ExperimentType {
35    PodChaos,
36    NetworkChaos,
37    StressChaos,
38    IOChaos,
39    TimeChaos,
40    KernelChaos,
41    DNSChaos,
42    HTTPChaos,
43    JVMChaos,
44}
45
46/// Pod chaos action
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "kebab-case")]
49pub enum PodChaosAction {
50    PodKill,
51    PodFailure,
52    ContainerKill,
53}
54
55/// Network chaos action
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "kebab-case")]
58pub enum NetworkChaosAction {
59    Delay,
60    Loss,
61    Duplicate,
62    Corrupt,
63    Partition,
64    Bandwidth,
65}
66
67/// Selector for targeting pods
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct PodSelector {
70    pub namespaces: Vec<String>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub label_selectors: Option<HashMap<String, String>>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub annotation_selectors: Option<HashMap<String, String>>,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub field_selectors: Option<HashMap<String, String>>,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub pod_phase_selectors: Option<Vec<String>>,
79}
80
81/// Network delay configuration
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct NetworkDelay {
84    pub latency: String, // e.g., "100ms", "1s"
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub correlation: Option<String>, // "0" to "100"
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub jitter: Option<String>, // e.g., "10ms"
89}
90
91/// Network loss configuration
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct NetworkLoss {
94    pub loss: String, // "0" to "100" (percentage)
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub correlation: Option<String>, // "0" to "100"
97}
98
99/// Stress test configuration
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct StressConfig {
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub cpu_workers: Option<u32>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub cpu_load: Option<u32>, // Percentage
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub memory_workers: Option<u32>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub memory_size: Option<String>, // e.g., "256MB", "1GB"
110}
111
112/// Experiment duration
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct Duration {
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub duration: Option<String>, // e.g., "30s", "5m"
117}
118
119/// Chaos experiment specification
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct ExperimentSpec {
122    pub selector: PodSelector,
123    pub mode: String, // "one", "all", "fixed", "fixed-percent", "random-max-percent"
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub value: Option<String>, // For "fixed" or "fixed-percent" mode
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub duration: Option<String>,
128
129    // Action-specific configurations
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub pod_action: Option<PodChaosAction>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub network_action: Option<NetworkChaosAction>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub delay: Option<NetworkDelay>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub loss: Option<NetworkLoss>,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub stressors: Option<StressConfig>,
140}
141
142/// Chaos Mesh experiment
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct ChaosMeshExperiment {
145    #[serde(rename = "apiVersion")]
146    pub api_version: String,
147    pub kind: String,
148    pub metadata: ExperimentMetadata,
149    pub spec: ExperimentSpec,
150}
151
152/// Experiment metadata
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ExperimentMetadata {
155    pub name: String,
156    pub namespace: String,
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub labels: Option<HashMap<String, String>>,
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub annotations: Option<HashMap<String, String>>,
161}
162
163/// Experiment status
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ExperimentStatus {
166    pub phase: String, // "Running", "Finished", "Paused", "Failed"
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub start_time: Option<String>,
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub end_time: Option<String>,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub conditions: Option<Vec<StatusCondition>>,
173}
174
175/// Status condition
176#[derive(Debug, Clone, Serialize, Deserialize)]
177pub struct StatusCondition {
178    #[serde(rename = "type")]
179    pub condition_type: String,
180    pub status: String,
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub reason: Option<String>,
183    #[serde(skip_serializing_if = "Option::is_none")]
184    pub message: Option<String>,
185}
186
187/// Chaos Mesh client
188pub struct ChaosMeshClient {
189    api_url: String,
190    namespace: String,
191    client: reqwest::Client,
192}
193
194impl ChaosMeshClient {
195    /// Create a new Chaos Mesh client
196    pub fn new(api_url: String, namespace: String) -> Self {
197        Self {
198            api_url,
199            namespace,
200            client: reqwest::Client::new(),
201        }
202    }
203
204    /// Create a pod chaos experiment
205    pub async fn create_pod_chaos(
206        &self,
207        name: &str,
208        action: PodChaosAction,
209        selector: PodSelector,
210        mode: &str,
211        duration: Option<&str>,
212    ) -> Result<ChaosMeshExperiment> {
213        let experiment = ChaosMeshExperiment {
214            api_version: "chaos-mesh.org/v1alpha1".to_string(),
215            kind: "PodChaos".to_string(),
216            metadata: ExperimentMetadata {
217                name: name.to_string(),
218                namespace: self.namespace.clone(),
219                labels: Some(HashMap::from([(
220                    "app.kubernetes.io/managed-by".to_string(),
221                    "mockforge".to_string(),
222                )])),
223                annotations: None,
224            },
225            spec: ExperimentSpec {
226                selector,
227                mode: mode.to_string(),
228                value: None,
229                duration: duration.map(String::from),
230                pod_action: Some(action),
231                network_action: None,
232                delay: None,
233                loss: None,
234                stressors: None,
235            },
236        };
237
238        self.create_experiment(&experiment).await
239    }
240
241    /// Create a network chaos experiment with delay
242    pub async fn create_network_delay(
243        &self,
244        name: &str,
245        selector: PodSelector,
246        latency: &str,
247        jitter: Option<&str>,
248        duration: Option<&str>,
249    ) -> Result<ChaosMeshExperiment> {
250        let experiment = ChaosMeshExperiment {
251            api_version: "chaos-mesh.org/v1alpha1".to_string(),
252            kind: "NetworkChaos".to_string(),
253            metadata: ExperimentMetadata {
254                name: name.to_string(),
255                namespace: self.namespace.clone(),
256                labels: Some(HashMap::from([(
257                    "app.kubernetes.io/managed-by".to_string(),
258                    "mockforge".to_string(),
259                )])),
260                annotations: None,
261            },
262            spec: ExperimentSpec {
263                selector,
264                mode: "all".to_string(),
265                value: None,
266                duration: duration.map(String::from),
267                pod_action: None,
268                network_action: Some(NetworkChaosAction::Delay),
269                delay: Some(NetworkDelay {
270                    latency: latency.to_string(),
271                    correlation: None,
272                    jitter: jitter.map(String::from),
273                }),
274                loss: None,
275                stressors: None,
276            },
277        };
278
279        self.create_experiment(&experiment).await
280    }
281
282    /// Create a network packet loss experiment
283    pub async fn create_network_loss(
284        &self,
285        name: &str,
286        selector: PodSelector,
287        loss_percent: &str,
288        duration: Option<&str>,
289    ) -> Result<ChaosMeshExperiment> {
290        let experiment = ChaosMeshExperiment {
291            api_version: "chaos-mesh.org/v1alpha1".to_string(),
292            kind: "NetworkChaos".to_string(),
293            metadata: ExperimentMetadata {
294                name: name.to_string(),
295                namespace: self.namespace.clone(),
296                labels: Some(HashMap::from([(
297                    "app.kubernetes.io/managed-by".to_string(),
298                    "mockforge".to_string(),
299                )])),
300                annotations: None,
301            },
302            spec: ExperimentSpec {
303                selector,
304                mode: "all".to_string(),
305                value: None,
306                duration: duration.map(String::from),
307                pod_action: None,
308                network_action: Some(NetworkChaosAction::Loss),
309                delay: None,
310                loss: Some(NetworkLoss {
311                    loss: loss_percent.to_string(),
312                    correlation: None,
313                }),
314                stressors: None,
315            },
316        };
317
318        self.create_experiment(&experiment).await
319    }
320
321    /// Create a stress chaos experiment
322    pub async fn create_stress_chaos(
323        &self,
324        name: &str,
325        selector: PodSelector,
326        stressors: StressConfig,
327        duration: Option<&str>,
328    ) -> Result<ChaosMeshExperiment> {
329        let experiment = ChaosMeshExperiment {
330            api_version: "chaos-mesh.org/v1alpha1".to_string(),
331            kind: "StressChaos".to_string(),
332            metadata: ExperimentMetadata {
333                name: name.to_string(),
334                namespace: self.namespace.clone(),
335                labels: Some(HashMap::from([(
336                    "app.kubernetes.io/managed-by".to_string(),
337                    "mockforge".to_string(),
338                )])),
339                annotations: None,
340            },
341            spec: ExperimentSpec {
342                selector,
343                mode: "all".to_string(),
344                value: None,
345                duration: duration.map(String::from),
346                pod_action: None,
347                network_action: None,
348                delay: None,
349                loss: None,
350                stressors: Some(stressors),
351            },
352        };
353
354        self.create_experiment(&experiment).await
355    }
356
357    /// Create an experiment
358    async fn create_experiment(
359        &self,
360        experiment: &ChaosMeshExperiment,
361    ) -> Result<ChaosMeshExperiment> {
362        let url = format!(
363            "{}/apis/chaos-mesh.org/v1alpha1/namespaces/{}/{}s",
364            self.api_url,
365            self.namespace,
366            experiment.kind.to_lowercase()
367        );
368
369        let response = self.client.post(&url).json(experiment).send().await?;
370
371        if !response.status().is_success() {
372            let error = response.text().await?;
373            return Err(ChaosMeshError::KubernetesError(error));
374        }
375
376        let created: ChaosMeshExperiment = response.json().await?;
377        Ok(created)
378    }
379
380    /// Get experiment status
381    pub async fn get_experiment_status(
382        &self,
383        experiment_type: &ExperimentType,
384        name: &str,
385    ) -> Result<ExperimentStatus> {
386        let kind = match experiment_type {
387            ExperimentType::PodChaos => "podchaos",
388            ExperimentType::NetworkChaos => "networkchaos",
389            ExperimentType::StressChaos => "stresschaos",
390            ExperimentType::IOChaos => "iochaos",
391            ExperimentType::TimeChaos => "timechaos",
392            ExperimentType::KernelChaos => "kernelchaos",
393            ExperimentType::DNSChaos => "dnschaos",
394            ExperimentType::HTTPChaos => "httpchaos",
395            ExperimentType::JVMChaos => "jvmchaos",
396        };
397
398        let url = format!(
399            "{}/apis/chaos-mesh.org/v1alpha1/namespaces/{}/{}es/{}/status",
400            self.api_url, self.namespace, kind, name
401        );
402
403        let response = self.client.get(&url).send().await?;
404
405        if !response.status().is_success() {
406            return Err(ChaosMeshError::ExperimentNotFound(name.to_string()));
407        }
408
409        let status: ExperimentStatus = response.json().await?;
410        Ok(status)
411    }
412
413    /// Delete an experiment
414    pub async fn delete_experiment(
415        &self,
416        experiment_type: &ExperimentType,
417        name: &str,
418    ) -> Result<()> {
419        let kind = match experiment_type {
420            ExperimentType::PodChaos => "podchaos",
421            ExperimentType::NetworkChaos => "networkchaos",
422            ExperimentType::StressChaos => "stresschaos",
423            ExperimentType::IOChaos => "iochaos",
424            ExperimentType::TimeChaos => "timechaos",
425            ExperimentType::KernelChaos => "kernelchaos",
426            ExperimentType::DNSChaos => "dnschaos",
427            ExperimentType::HTTPChaos => "httpchaos",
428            ExperimentType::JVMChaos => "jvmchaos",
429        };
430
431        let url = format!(
432            "{}/apis/chaos-mesh.org/v1alpha1/namespaces/{}/{}es/{}",
433            self.api_url, self.namespace, kind, name
434        );
435
436        let response = self.client.delete(&url).send().await?;
437
438        if !response.status().is_success() {
439            let error = response.text().await?;
440            return Err(ChaosMeshError::KubernetesError(error));
441        }
442
443        Ok(())
444    }
445
446    /// Pause an experiment
447    pub async fn pause_experiment(
448        &self,
449        experiment_type: &ExperimentType,
450        name: &str,
451    ) -> Result<()> {
452        self.update_experiment_annotation(
453            experiment_type,
454            name,
455            "experiment.chaos-mesh.org/pause",
456            "true",
457        )
458        .await
459    }
460
461    /// Resume a paused experiment
462    pub async fn resume_experiment(
463        &self,
464        experiment_type: &ExperimentType,
465        name: &str,
466    ) -> Result<()> {
467        self.update_experiment_annotation(
468            experiment_type,
469            name,
470            "experiment.chaos-mesh.org/pause",
471            "false",
472        )
473        .await
474    }
475
476    /// Update experiment annotation
477    async fn update_experiment_annotation(
478        &self,
479        experiment_type: &ExperimentType,
480        name: &str,
481        annotation_key: &str,
482        annotation_value: &str,
483    ) -> Result<()> {
484        let kind = match experiment_type {
485            ExperimentType::PodChaos => "podchaos",
486            ExperimentType::NetworkChaos => "networkchaos",
487            ExperimentType::StressChaos => "stresschaos",
488            ExperimentType::IOChaos => "iochaos",
489            ExperimentType::TimeChaos => "timechaos",
490            ExperimentType::KernelChaos => "kernelchaos",
491            ExperimentType::DNSChaos => "dnschaos",
492            ExperimentType::HTTPChaos => "httpchaos",
493            ExperimentType::JVMChaos => "jvmchaos",
494        };
495
496        let url = format!(
497            "{}/apis/chaos-mesh.org/v1alpha1/namespaces/{}/{}es/{}",
498            self.api_url, self.namespace, kind, name
499        );
500
501        let patch = serde_json::json!({
502            "metadata": {
503                "annotations": {
504                    annotation_key: annotation_value
505                }
506            }
507        });
508
509        let response = self
510            .client
511            .patch(&url)
512            .header("Content-Type", "application/merge-patch+json")
513            .json(&patch)
514            .send()
515            .await?;
516
517        if !response.status().is_success() {
518            let error = response.text().await?;
519            return Err(ChaosMeshError::KubernetesError(error));
520        }
521
522        Ok(())
523    }
524
525    /// List all experiments in namespace
526    pub async fn list_experiments(
527        &self,
528        experiment_type: &ExperimentType,
529    ) -> Result<Vec<ChaosMeshExperiment>> {
530        let kind = match experiment_type {
531            ExperimentType::PodChaos => "podchaos",
532            ExperimentType::NetworkChaos => "networkchaos",
533            ExperimentType::StressChaos => "stresschaos",
534            ExperimentType::IOChaos => "iochaos",
535            ExperimentType::TimeChaos => "timechaos",
536            ExperimentType::KernelChaos => "kernelchaos",
537            ExperimentType::DNSChaos => "dnschaos",
538            ExperimentType::HTTPChaos => "httpchaos",
539            ExperimentType::JVMChaos => "jvmchaos",
540        };
541
542        let url = format!(
543            "{}/apis/chaos-mesh.org/v1alpha1/namespaces/{}/{}es",
544            self.api_url, self.namespace, kind
545        );
546
547        let response = self.client.get(&url).send().await?;
548
549        if !response.status().is_success() {
550            let error = response.text().await?;
551            return Err(ChaosMeshError::KubernetesError(error));
552        }
553
554        #[derive(Deserialize)]
555        struct ExperimentList {
556            items: Vec<ChaosMeshExperiment>,
557        }
558
559        let list: ExperimentList = response.json().await?;
560        Ok(list.items)
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    #[test]
569    fn test_experiment_serialization() {
570        let selector = PodSelector {
571            namespaces: vec!["default".to_string()],
572            label_selectors: Some(HashMap::from([("app".to_string(), "test".to_string())])),
573            annotation_selectors: None,
574            field_selectors: None,
575            pod_phase_selectors: None,
576        };
577
578        let experiment = ChaosMeshExperiment {
579            api_version: "chaos-mesh.org/v1alpha1".to_string(),
580            kind: "PodChaos".to_string(),
581            metadata: ExperimentMetadata {
582                name: "test-chaos".to_string(),
583                namespace: "default".to_string(),
584                labels: None,
585                annotations: None,
586            },
587            spec: ExperimentSpec {
588                selector,
589                mode: "one".to_string(),
590                value: None,
591                duration: Some("30s".to_string()),
592                pod_action: Some(PodChaosAction::PodKill),
593                network_action: None,
594                delay: None,
595                loss: None,
596                stressors: None,
597            },
598        };
599
600        let json = serde_json::to_string_pretty(&experiment).unwrap();
601        assert!(json.contains("PodChaos"));
602        assert!(json.contains("test-chaos"));
603    }
604
605    #[test]
606    fn test_network_delay_config() {
607        let delay = NetworkDelay {
608            latency: "100ms".to_string(),
609            correlation: Some("50".to_string()),
610            jitter: Some("10ms".to_string()),
611        };
612
613        let json = serde_json::to_string(&delay).unwrap();
614        assert!(json.contains("100ms"));
615        assert!(json.contains("10ms"));
616    }
617
618    #[test]
619    fn test_stress_config() {
620        let stress = StressConfig {
621            cpu_workers: Some(2),
622            cpu_load: Some(50),
623            memory_workers: Some(1),
624            memory_size: Some("256MB".to_string()),
625        };
626
627        let json = serde_json::to_string(&stress).unwrap();
628        assert!(json.contains("256MB"));
629    }
630}