mockforge_k8s_operator/
controller.rs1use crate::crd::ChaosOrchestration;
4use crate::reconciler::Reconciler;
5use crate::{OperatorError, Result};
6use futures::StreamExt;
7use kube::{
8 api::ListParams,
9 runtime::{
10 controller::{Action, Controller as KubeController},
11 utils::WatchStreamExt,
12 watcher::Config,
13 },
14 Api, Client, ResourceExt,
15};
16use std::sync::Arc;
17use std::time::Duration;
18use tracing::{debug, error, info};
19
20pub struct Controller {
22 client: Client,
23 reconciler: Arc<Reconciler>,
24}
25
26impl Controller {
27 pub fn new(client: Client) -> Self {
29 let reconciler = Arc::new(Reconciler::new(client.clone()));
30
31 Self { client, reconciler }
32 }
33
34 pub async fn run(&self, namespace: Option<String>) -> Result<()> {
36 info!("Starting MockForge Kubernetes Operator");
37
38 let api: Api<ChaosOrchestration> = if let Some(ns) = namespace {
39 info!("Watching namespace: {}", ns);
40 Api::namespaced(self.client.clone(), &ns)
41 } else {
42 info!("Watching all namespaces");
43 Api::all(self.client.clone())
44 };
45
46 let reconciler = self.reconciler.clone();
47
48 KubeController::new(api.clone(), Config::default())
49 .shutdown_on_signal()
50 .run(
51 move |orchestration, _ctx| {
52 let reconciler = reconciler.clone();
53 async move { Self::reconcile(orchestration, reconciler).await }
54 },
55 |_orchestration, error, _ctx| {
56 error!("Reconciliation error: {:?}", error);
57 Action::requeue(Duration::from_secs(60))
58 },
59 Arc::new(()),
60 )
61 .for_each(|res| async move {
62 match res {
63 Ok(o) => debug!("Reconciled: {:?}", o),
64 Err(e) => error!("Reconcile error: {:?}", e),
65 }
66 })
67 .await;
68
69 Ok(())
70 }
71
72 async fn reconcile(
74 orchestration: Arc<ChaosOrchestration>,
75 reconciler: Arc<Reconciler>,
76 ) -> std::result::Result<Action, OperatorError> {
77 let name = orchestration.name_any();
78 let namespace = orchestration.namespace().unwrap_or_else(|| "default".to_string());
79
80 info!("Reconciling ChaosOrchestration: {}/{}", namespace, name);
81
82 match orchestration.metadata.deletion_timestamp {
83 Some(_) => {
84 reconciler.cleanup(&name).await?;
86 Ok(Action::await_change())
87 }
88 None => {
89 reconciler.reconcile(orchestration, &namespace).await?;
91
92 Ok(Action::requeue(Duration::from_secs(30)))
94 }
95 }
96 }
97
98 pub async fn watch(&self, namespace: Option<String>) -> Result<()> {
100 let api: Api<ChaosOrchestration> = if let Some(ns) = &namespace {
101 Api::namespaced(self.client.clone(), ns)
102 } else {
103 Api::all(self.client.clone())
104 };
105
106 let stream =
107 kube::runtime::watcher(api, Config::default().any_semantic()).applied_objects();
108 let mut stream = Box::pin(stream);
109
110 info!("Watching for ChaosOrchestration resources...");
111
112 while let Some(event) = stream.next().await {
113 match event {
114 Ok(orchestration) => {
115 info!(
116 "Detected change: {}/{}",
117 orchestration.namespace().unwrap_or_else(|| "default".to_string()),
118 orchestration.name_any()
119 );
120 }
121 Err(e) => {
122 error!("Watch error: {:?}", e);
123 }
124 }
125 }
126
127 Ok(())
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn test_controller_creation() {
137 }
144}