mockforge_k8s_operator/
controller.rs1use crate::crd::ChaosOrchestration;
4use crate::reconciler::Reconciler;
5use crate::{OperatorError, Result};
6use futures::StreamExt;
7use kube::{
8 runtime::{
9 controller::{Action, Controller as KubeController},
10 utils::WatchStreamExt,
11 watcher::Config,
12 },
13 Api, Client, ResourceExt,
14};
15use std::sync::Arc;
16use std::time::Duration;
17use tracing::{debug, error, info};
18
19pub struct Controller {
21 client: Client,
22 reconciler: Arc<Reconciler>,
23}
24
25impl Controller {
26 pub fn new(client: Client) -> Self {
28 let reconciler = Arc::new(Reconciler::new(client.clone()));
29
30 Self { client, reconciler }
31 }
32
33 pub async fn run(&self, namespace: Option<String>) -> Result<()> {
35 info!("Starting MockForge Kubernetes Operator");
36
37 let api: Api<ChaosOrchestration> = if let Some(ns) = namespace {
38 info!("Watching namespace: {}", ns);
39 Api::namespaced(self.client.clone(), &ns)
40 } else {
41 info!("Watching all namespaces");
42 Api::all(self.client.clone())
43 };
44
45 let reconciler = self.reconciler.clone();
46
47 KubeController::new(api.clone(), Config::default())
48 .shutdown_on_signal()
49 .run(
50 move |orchestration, _ctx| {
51 let reconciler = reconciler.clone();
52 async move { Self::reconcile(orchestration, reconciler).await }
53 },
54 |_orchestration, error, _ctx| {
55 error!("Reconciliation error: {:?}", error);
56 Action::requeue(Duration::from_secs(60))
57 },
58 Arc::new(()),
59 )
60 .for_each(|res| async move {
61 match res {
62 Ok(o) => debug!("Reconciled: {:?}", o),
63 Err(e) => error!("Reconcile error: {:?}", e),
64 }
65 })
66 .await;
67
68 Ok(())
69 }
70
71 async fn reconcile(
73 orchestration: Arc<ChaosOrchestration>,
74 reconciler: Arc<Reconciler>,
75 ) -> std::result::Result<Action, OperatorError> {
76 let name = orchestration.name_any();
77 let namespace = orchestration.namespace().unwrap_or_else(|| "default".to_string());
78
79 info!("Reconciling ChaosOrchestration: {}/{}", namespace, name);
80
81 match orchestration.metadata.deletion_timestamp {
82 Some(_) => {
83 reconciler.cleanup(&name).await?;
85 Ok(Action::await_change())
86 }
87 None => {
88 reconciler.reconcile(orchestration, &namespace).await?;
90
91 Ok(Action::requeue(Duration::from_secs(30)))
93 }
94 }
95 }
96
97 pub async fn watch(&self, namespace: Option<String>) -> Result<()> {
99 let api: Api<ChaosOrchestration> = if let Some(ns) = &namespace {
100 Api::namespaced(self.client.clone(), ns)
101 } else {
102 Api::all(self.client.clone())
103 };
104
105 let stream =
106 kube::runtime::watcher(api, Config::default().any_semantic()).applied_objects();
107 let mut stream = Box::pin(stream);
108
109 info!("Watching for ChaosOrchestration resources...");
110
111 while let Some(event) = stream.next().await {
112 match event {
113 Ok(orchestration) => {
114 info!(
115 "Detected change: {}/{}",
116 orchestration.namespace().unwrap_or_else(|| "default".to_string()),
117 orchestration.name_any()
118 );
119 }
120 Err(e) => {
121 error!("Watch error: {:?}", e);
122 }
123 }
124 }
125
126 Ok(())
127 }
128}
129
130