kaniop_operator/controller/
mod.rs1pub mod context;
2pub mod kanidm;
3
4use self::{context::Context, kanidm::KanidmClients};
5
6use crate::kanidm::crd::Kanidm;
7use crate::metrics;
8use kaniop_k8s_util::error::{Error, Result};
9
10use kaniop_k8s_util::types::short_type_name;
11
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use futures::channel::mpsc;
16use futures::future::BoxFuture;
17use futures::{FutureExt, StreamExt};
18use k8s_openapi::api::core::v1::Namespace;
19use kube::Resource;
20use kube::api::{Api, ListParams, PartialObjectMeta, ResourceExt};
21use kube::client::Client;
22use kube::runtime::controller::Action;
23use kube::runtime::events::Recorder;
24use kube::runtime::reflector::store::Writer;
25use kube::runtime::reflector::{self, Lookup, ReflectHandle, Store};
26use kube::runtime::{WatchStreamExt, metadata_watcher, watcher};
27use prometheus_client::registry::Registry;
28use serde::de::DeserializeOwned;
29use tokio::sync::RwLock;
30use tokio::time::Duration;
31use tracing::{debug, error, trace};
32
33pub const DEFAULT_RECONCILE_INTERVAL: Duration = Duration::from_secs(60);
34pub const SUBSCRIBE_BUFFER_SIZE: usize = 256;
35pub const RELOAD_BUFFER_SIZE: usize = 16;
36pub const NAME_LABEL: &str = "app.kubernetes.io/name";
37pub const INSTANCE_LABEL: &str = "app.kubernetes.io/instance";
38pub const MANAGED_BY_LABEL: &str = "app.kubernetes.io/managed-by";
39
40pub type ControllerId = &'static str;
41
42#[derive(Clone)]
45pub struct State {
46 metrics: Arc<metrics::Metrics>,
48 idm_clients: Arc<RwLock<KanidmClients>>,
50 system_clients: Arc<RwLock<KanidmClients>>,
53 pub namespace_store: Store<Namespace>,
55 pub kanidm_store: Store<Kanidm>,
57 pub client: Option<Client>,
59}
60
61pub struct ResourceReflector<K>
63where
64 K: Resource + Lookup + Clone + 'static,
65 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
66{
67 pub store: Store<K>,
68 pub writer: Writer<K>,
69 pub subscriber: ReflectHandle<K>,
70}
71
72impl State {
74 pub fn new(
75 registry: Registry,
76 controller_names: &[&'static str],
77 namespace_store: Store<Namespace>,
78 kanidm_store: Store<Kanidm>,
79 client: Option<Client>,
80 ) -> Self {
81 Self {
82 metrics: Arc::new(metrics::Metrics::new(registry, controller_names)),
83 idm_clients: Arc::default(),
84 system_clients: Arc::default(),
85 namespace_store,
86 kanidm_store,
87 client,
88 }
89 }
90
91 pub fn metrics(&self) -> Result<String> {
93 let mut buffer = String::new();
94 let registry = &*self.metrics.registry;
95 prometheus_client::encoding::text::encode(&mut buffer, registry)
96 .map_err(|e| Error::FormattingError("failed to encode metrics".to_string(), e))?;
97 Ok(buffer)
98 }
99
100 pub fn to_context<K>(&self, client: Client, controller_id: ControllerId) -> Context<K>
102 where
103 K: Resource + Lookup + Clone + 'static,
104 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
105 {
106 Context::new(
107 controller_id,
108 client.clone(),
109 self.metrics
110 .controllers
111 .get(controller_id)
112 .expect("all CONTROLLER_IDs have to be registered")
113 .clone(),
114 Recorder::new(client.clone(), controller_id.into()),
115 self.idm_clients.clone(),
116 self.system_clients.clone(),
117 self.namespace_store.clone(),
118 self.kanidm_store.clone(),
119 )
120 }
121}
122
123pub async fn check_api_queryable<K>(client: Client) -> Api<K>
124where
125 K: Resource + Clone + DeserializeOwned + Debug,
126 <K as Resource>::DynamicType: Default,
127{
128 let api = Api::<K>::all(client.clone());
129 if let Err(e) = api.list(&ListParams::default().limit(1)).await {
130 error!(
131 "{} is not queryable; {e:?}. Check controller permissions",
132 short_type_name::<K>().unwrap_or("Unknown resource"),
133 );
134 std::process::exit(1);
135 }
136 api
137}
138
139pub fn create_subscriber<K>(buffer_size: usize) -> ResourceReflector<K>
140where
141 K: Resource + Lookup + Clone + 'static,
142 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
143 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
144{
145 let (store, writer) = reflector::store_shared::<K>(buffer_size);
146 let subscriber = writer
147 .subscribe()
148 .expect("subscribers can only be created from shared stores");
149
150 ResourceReflector {
151 store,
152 writer,
153 subscriber,
154 }
155}
156
157fn create_generic_watcher<K, W, T, S, StreamT>(
158 api: Api<K>,
159 writer: Writer<W>,
160 reload_tx: mpsc::Sender<()>,
161 controller_id: ControllerId,
162 ctx: Arc<Context<T>>,
163 stream_fn: S,
164) -> BoxFuture<'static, ()>
165where
166 K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
167 W: Resource + ResourceExt + Lookup + Clone + Debug + Send + Sync + 'static,
168 <W as Resource>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
169 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
170 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
171 <W as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
172 T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
173 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
174 S: Fn(Api<K>, watcher::Config) -> StreamT + 'static,
175 StreamT: futures::Stream<Item = Result<watcher::Event<W>, kube::runtime::watcher::Error>>
176 + Send
177 + 'static,
178{
179 let resource_name = short_type_name::<K>().unwrap_or("Unknown");
180
181 stream_fn(
182 api,
183 watcher::Config::default().labels(&format!("{MANAGED_BY_LABEL}=kaniop-{controller_id}")),
184 )
185 .default_backoff()
186 .reflect_shared(writer)
187 .for_each(move |res| {
188 let mut reload_tx_clone = reload_tx.clone();
189 let ctx = ctx.clone();
190 async move {
191 match res {
192 Ok(event) => {
193 trace!(msg = "watched event", ?event);
194 match event {
195 watcher::Event::Delete(d) => {
196 debug!(
197 msg = format!("delete event for {resource_name} trigger reconcile"),
198 namespace = ResourceExt::namespace(&d).unwrap(),
199 name = d.name_any()
200 );
201
202 let _ignore_errors = reload_tx_clone.try_send(()).map_err(
206 |e| error!(msg = "failed to trigger reconcile on delete", %e),
207 );
208 ctx.metrics
209 .triggered_inc(metrics::Action::Delete, resource_name);
210 }
211 watcher::Event::Apply(d) => {
212 debug!(
213 msg = format!("apply event for {resource_name} trigger reconcile"),
214 namespace = ResourceExt::namespace(&d).unwrap(),
215 name = d.name_any()
216 );
217 ctx.metrics
218 .triggered_inc(metrics::Action::Apply, resource_name);
219 }
220 _ => {}
221 }
222 }
223 Err(e) => {
224 error!(msg = format!("unexpected error when watching {resource_name}"), %e);
225 ctx.metrics.watch_operations_failed_inc();
226 }
227 }
228 }
229 })
230 .boxed()
231}
232
233pub fn create_watcher<K, T>(
234 api: Api<K>,
235 writer: Writer<K>,
236 reload_tx: mpsc::Sender<()>,
237 controller_id: ControllerId,
238 ctx: Arc<Context<T>>,
239) -> BoxFuture<'static, ()>
240where
241 K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
242 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
243 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
244 T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
245 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
246{
247 create_generic_watcher::<K, K, T, _, _>(api, writer, reload_tx, controller_id, ctx, watcher)
248}
249
250pub fn create_metadata_watcher<K, T>(
251 api: Api<K>,
252 writer: Writer<PartialObjectMeta<K>>,
253 reload_tx: mpsc::Sender<()>,
254 controller_id: ControllerId,
255 ctx: Arc<Context<T>>,
256) -> BoxFuture<'static, ()>
257where
258 K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
259 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
260 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
261 T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
262 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
263{
264 create_generic_watcher::<K, PartialObjectMeta<K>, T, _, _>(
265 api,
266 writer,
267 reload_tx,
268 controller_id,
269 ctx,
270 metadata_watcher,
271 )
272}
273
274pub fn error_policy<K>(_obj: Arc<K>, _error: &Error, _ctx: Arc<Context<K>>) -> Action
275where
276 K: Resource + Lookup + Clone + 'static,
277 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
278{
279 unreachable!("Handle in backoff_reconciler macro")
280}
281
282#[macro_export]
283macro_rules! backoff_reconciler {
284 ($inner_reconciler:ident) => {
285 |obj, ctx| async move {
286 use $crate::controller::context::BackoffContext;
287 match $inner_reconciler(obj.clone(), ctx.clone()).await {
288 Ok(action) => {
289 ctx.reset_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
290 .await;
291 Ok(action)
292 }
293 Err(error) => {
294 let namespace = kube::ResourceExt::namespace(obj.as_ref()).unwrap();
296 let name = kube::ResourceExt::name_any(obj.as_ref());
297 tracing::error!(msg = "failed reconciliation", %namespace, %name, %error);
298 ctx.metrics().reconcile_failure_inc();
299 let backoff_duration = ctx
300 .get_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
301 .await;
302 tracing::trace!(
303 msg = format!("backoff duration: {backoff_duration:?}"),
304 %namespace,
305 %name,
306 );
307 Ok(kube::runtime::controller::Action::requeue(backoff_duration))
308 }
309 }
310 }
311 };
312}