Skip to main content

mockforge_k8s_operator/
controller.rs

1//! Controller for managing ChaosOrchestration resources
2
3use crate::crd::ChaosOrchestration;
4use crate::reconciler::Reconciler;
5use crate::{OperatorError, Result};
6use futures::StreamExt;
7use kube::{
8    runtime::{
9        controller::{Action, Controller as KubeController},
10        utils::WatchStreamExt,
11        watcher::Config,
12    },
13    Api, Client, ResourceExt,
14};
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{debug, error, info};
18
19/// Main controller for the operator
20pub struct Controller {
21    client: Client,
22    reconciler: Arc<Reconciler>,
23}
24
25impl Controller {
26    /// Create a new controller
27    pub fn new(client: Client) -> Self {
28        let reconciler = Arc::new(Reconciler::new(client.clone()));
29
30        Self { client, reconciler }
31    }
32
33    /// Run the controller
34    pub async fn run(&self, namespace: Option<String>) -> Result<()> {
35        info!("Starting MockForge Kubernetes Operator");
36
37        let api: Api<ChaosOrchestration> = if let Some(ns) = namespace {
38            info!("Watching namespace: {}", ns);
39            Api::namespaced(self.client.clone(), &ns)
40        } else {
41            info!("Watching all namespaces");
42            Api::all(self.client.clone())
43        };
44
45        let reconciler = self.reconciler.clone();
46
47        KubeController::new(api.clone(), Config::default())
48            .shutdown_on_signal()
49            .run(
50                move |orchestration, _ctx| {
51                    let reconciler = reconciler.clone();
52                    async move { Self::reconcile(orchestration, reconciler).await }
53                },
54                |_orchestration, error, _ctx| {
55                    error!("Reconciliation error: {:?}", error);
56                    Action::requeue(Duration::from_secs(60))
57                },
58                Arc::new(()),
59            )
60            .for_each(|res| async move {
61                match res {
62                    Ok(o) => debug!("Reconciled: {:?}", o),
63                    Err(e) => error!("Reconcile error: {:?}", e),
64                }
65            })
66            .await;
67
68        Ok(())
69    }
70
71    /// Reconcile a single orchestration
72    async fn reconcile(
73        orchestration: Arc<ChaosOrchestration>,
74        reconciler: Arc<Reconciler>,
75    ) -> std::result::Result<Action, OperatorError> {
76        let name = orchestration.name_any();
77        let namespace = orchestration.namespace().unwrap_or_else(|| "default".to_string());
78
79        info!("Reconciling ChaosOrchestration: {}/{}", namespace, name);
80
81        match orchestration.metadata.deletion_timestamp {
82            Some(_) => {
83                // Handle deletion
84                reconciler.cleanup(&name).await?;
85                Ok(Action::await_change())
86            }
87            None => {
88                // Normal reconciliation
89                reconciler.reconcile(orchestration, &namespace).await?;
90
91                // Requeue after 30 seconds to check status
92                Ok(Action::requeue(Duration::from_secs(30)))
93            }
94        }
95    }
96
97    /// Watch for ChaosOrchestration resources
98    pub async fn watch(&self, namespace: Option<String>) -> Result<()> {
99        let api: Api<ChaosOrchestration> = if let Some(ns) = &namespace {
100            Api::namespaced(self.client.clone(), ns)
101        } else {
102            Api::all(self.client.clone())
103        };
104
105        let stream =
106            kube::runtime::watcher(api, Config::default().any_semantic()).applied_objects();
107        let mut stream = Box::pin(stream);
108
109        info!("Watching for ChaosOrchestration resources...");
110
111        while let Some(event) = stream.next().await {
112            match event {
113                Ok(orchestration) => {
114                    info!(
115                        "Detected change: {}/{}",
116                        orchestration.namespace().unwrap_or_else(|| "default".to_string()),
117                        orchestration.name_any()
118                    );
119                }
120                Err(e) => {
121                    error!("Watch error: {:?}", e);
122                }
123            }
124        }
125
126        Ok(())
127    }
128}
129
130// Unit tests for Controller require a Kubernetes cluster (kube::Client).
131// See reconciler.rs for tests of reconciliation logic that don't need a cluster.