Skip to main content

mockforge_k8s_operator/
controller.rs

1//! Controller for managing ChaosOrchestration resources
2
3use crate::crd::ChaosOrchestration;
4use crate::reconciler::Reconciler;
5use crate::{OperatorError, Result};
6use futures::StreamExt;
7use kube::{
8    api::ListParams,
9    runtime::{
10        controller::{Action, Controller as KubeController},
11        utils::WatchStreamExt,
12        watcher::Config,
13    },
14    Api, Client, ResourceExt,
15};
16use std::sync::Arc;
17use std::time::Duration;
18use tracing::{debug, error, info};
19
20/// Main controller for the operator
21pub struct Controller {
22    client: Client,
23    reconciler: Arc<Reconciler>,
24}
25
26impl Controller {
27    /// Create a new controller
28    pub fn new(client: Client) -> Self {
29        let reconciler = Arc::new(Reconciler::new(client.clone()));
30
31        Self { client, reconciler }
32    }
33
34    /// Run the controller
35    pub async fn run(&self, namespace: Option<String>) -> Result<()> {
36        info!("Starting MockForge Kubernetes Operator");
37
38        let api: Api<ChaosOrchestration> = if let Some(ns) = namespace {
39            info!("Watching namespace: {}", ns);
40            Api::namespaced(self.client.clone(), &ns)
41        } else {
42            info!("Watching all namespaces");
43            Api::all(self.client.clone())
44        };
45
46        let reconciler = self.reconciler.clone();
47
48        KubeController::new(api.clone(), Config::default())
49            .shutdown_on_signal()
50            .run(
51                move |orchestration, _ctx| {
52                    let reconciler = reconciler.clone();
53                    async move { Self::reconcile(orchestration, reconciler).await }
54                },
55                |_orchestration, error, _ctx| {
56                    error!("Reconciliation error: {:?}", error);
57                    Action::requeue(Duration::from_secs(60))
58                },
59                Arc::new(()),
60            )
61            .for_each(|res| async move {
62                match res {
63                    Ok(o) => debug!("Reconciled: {:?}", o),
64                    Err(e) => error!("Reconcile error: {:?}", e),
65                }
66            })
67            .await;
68
69        Ok(())
70    }
71
72    /// Reconcile a single orchestration
73    async fn reconcile(
74        orchestration: Arc<ChaosOrchestration>,
75        reconciler: Arc<Reconciler>,
76    ) -> std::result::Result<Action, OperatorError> {
77        let name = orchestration.name_any();
78        let namespace = orchestration.namespace().unwrap_or_else(|| "default".to_string());
79
80        info!("Reconciling ChaosOrchestration: {}/{}", namespace, name);
81
82        match orchestration.metadata.deletion_timestamp {
83            Some(_) => {
84                // Handle deletion
85                reconciler.cleanup(&name).await?;
86                Ok(Action::await_change())
87            }
88            None => {
89                // Normal reconciliation
90                reconciler.reconcile(orchestration, &namespace).await?;
91
92                // Requeue after 30 seconds to check status
93                Ok(Action::requeue(Duration::from_secs(30)))
94            }
95        }
96    }
97
98    /// Watch for ChaosOrchestration resources
99    pub async fn watch(&self, namespace: Option<String>) -> Result<()> {
100        let api: Api<ChaosOrchestration> = if let Some(ns) = &namespace {
101            Api::namespaced(self.client.clone(), ns)
102        } else {
103            Api::all(self.client.clone())
104        };
105
106        let stream =
107            kube::runtime::watcher(api, Config::default().any_semantic()).applied_objects();
108        let mut stream = Box::pin(stream);
109
110        info!("Watching for ChaosOrchestration resources...");
111
112        while let Some(event) = stream.next().await {
113            match event {
114                Ok(orchestration) => {
115                    info!(
116                        "Detected change: {}/{}",
117                        orchestration.namespace().unwrap_or_else(|| "default".to_string()),
118                        orchestration.name_any()
119                    );
120                }
121                Err(e) => {
122                    error!("Watch error: {:?}", e);
123                }
124            }
125        }
126
127        Ok(())
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn test_controller_creation() {
137        // This test requires a Kubernetes cluster, so it's just a placeholder
138        // In a real test environment, you would:
139        // 1. Create a test Kubernetes cluster (e.g., using kind)
140        // 2. Create a Client
141        // 3. Instantiate the Controller
142        // 4. Test reconciliation logic
143    }
144}