1use super::{
2 ControllerId, DEFAULT_RECONCILE_INTERVAL, KanidmClients,
3 kanidm::{ClientLockKey, KanidmKey, KanidmResource, KanidmUser},
4};
5
6use crate::kanidm::crd::Kanidm;
7use crate::metrics::ControllerMetrics;
8use kaniop_k8s_util::error::{Error, Result};
9
10use kaniop_k8s_util::types::short_type_name;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
16use k8s_openapi::{NamespaceResourceScope, api::core::v1::Namespace};
17use kanidm_client::KanidmClient;
18use kube::runtime::events::{Event, EventType, Recorder};
19use kube::{Api, client::Client};
20use kube::{Resource, ResourceExt};
21use kube::{
22 api::{Patch, PatchParams},
23 runtime::reflector::{Lookup, ObjectRef, Store},
24};
25use serde::{Deserialize, Serialize};
26use tokio::sync::{Mutex, RwLock};
27use tokio::time::Duration;
28use tracing::{debug, error, info, trace};
29
30#[derive(Clone)]
32pub struct Context<K: Resource> {
33 pub controller_id: ControllerId,
35 pub client: Client,
37 pub metrics: Arc<ControllerMetrics>,
39 error_backoff_cache: Arc<RwLock<HashMap<ObjectRef<K>, RwLock<ExponentialBackoff>>>>,
41 pub recorder: Recorder,
43 pub namespace_store: Store<Namespace>,
45 pub kanidm_store: Store<Kanidm>,
47 idm_clients: Arc<RwLock<KanidmClients>>,
49 system_clients: Arc<RwLock<KanidmClients>>,
52 client_creation_locks: Arc<RwLock<HashMap<ClientLockKey, Arc<Mutex<()>>>>>,
54}
55
56impl<K> Context<K>
57where
58 K: Resource + ResourceExt + Lookup + Clone + 'static,
59 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
60{
61 #[allow(clippy::too_many_arguments)]
62 pub fn new(
63 controller_id: ControllerId,
64 client: Client,
65 metrics: Arc<ControllerMetrics>,
66 recorder: Recorder,
67 idm_clients: Arc<RwLock<KanidmClients>>,
68 system_clients: Arc<RwLock<KanidmClients>>,
69 namespace_store: Store<Namespace>,
70 kanidm_store: Store<Kanidm>,
71 ) -> Self {
72 Self {
73 controller_id,
74 client,
75 metrics,
76 recorder,
77 namespace_store,
78 kanidm_store,
79 idm_clients,
80 system_clients,
81 error_backoff_cache: Arc::default(),
82 client_creation_locks: Arc::default(),
83 }
84 }
85
86 pub async fn release_kanidm_clients(&self, kanidm: &Kanidm) {
87 let key = KanidmKey {
88 namespace: kube::ResourceExt::namespace(kanidm).unwrap(),
90 name: kanidm.name_any(),
91 };
92
93 self.idm_clients.write().await.remove(&key);
94 self.system_clients.write().await.remove(&key);
95 {
96 let mut locks = self.client_creation_locks.write().await;
97 locks.remove(&ClientLockKey {
98 namespace: key.namespace.clone(),
99 name: key.name.clone(),
100 user: KanidmUser::IdmAdmin,
101 });
102 locks.remove(&ClientLockKey {
103 namespace: key.namespace.clone(),
104 name: key.name.clone(),
105 user: KanidmUser::Admin,
106 });
107 }
108 }
109}
110
111impl<K> Context<K>
112where
113 K: Resource<DynamicType = ()> + ResourceExt + KanidmResource + Lookup + Clone + 'static,
114 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
115{
116 async fn get_valid_cached_client(
118 cache: &Arc<RwLock<KanidmClients>>,
119 key: &KanidmKey,
120 namespace: &str,
121 name: &str,
122 ) -> Option<Arc<KanidmClient>> {
123 let client = cache.read().await.get(key).cloned()?;
124
125 trace!(
126 msg = "check existing Kanidm client session",
127 namespace, name
128 );
129 if client.auth_valid().await.is_ok() {
130 trace!(msg = "reuse Kanidm client session", namespace, name);
131 Some(client)
132 } else {
133 None
134 }
135 }
136
137 async fn get_kanidm_client(&self, obj: &K, user: KanidmUser) -> Result<Arc<KanidmClient>> {
140 let namespace = obj.kanidm_namespace();
141 let name = obj.kanidm_name();
142 debug!(msg = "get Kanidm client", namespace, name);
143
144 let cache = match user {
145 KanidmUser::Admin => self.system_clients.clone(),
146 KanidmUser::IdmAdmin => self.idm_clients.clone(),
147 };
148 let key = KanidmKey {
149 namespace: namespace.clone(),
150 name: name.clone(),
151 };
152 if let Some(client) = Self::get_valid_cached_client(&cache, &key, &namespace, &name).await {
153 return Ok(client);
154 }
155
156 let creation_lock = self
158 .client_creation_locks
159 .write()
160 .await
161 .entry(ClientLockKey {
162 namespace: namespace.clone(),
163 name: name.clone(),
164 user: user.clone(),
165 })
166 .or_insert_with(Arc::default)
167 .clone();
168 let _guard = creation_lock.lock().await;
169
170 if let Some(client) = Self::get_valid_cached_client(&cache, &key, &namespace, &name).await {
172 return Ok(client);
173 }
174
175 match KanidmClients::create_client(&namespace, &name, user, self.client.clone()).await {
176 Ok(client) => {
177 cache.write().await.insert(key.clone(), client.clone());
178 Ok(client)
179 }
180 Err(e) => {
181 self.recorder
182 .publish(
183 &Event {
184 type_: EventType::Warning,
185 reason: "KanidmClientError".to_string(),
186 note: Some(e.to_string()),
187 action: "KanidmClientCreating".into(),
188 secondary: None,
189 },
190 &obj.object_ref(&()),
191 )
192 .await
193 .map_err(|e| {
194 error!(msg = "failed to create Kanidm client", %e);
195 Error::KubeError("failed to publish event".to_string(), Box::new(e))
196 })?;
197 Err(e)
198 }
199 }
200 }
201
202 pub fn get_kanidm(&self, obj: &K) -> Option<Arc<Kanidm>> {
206 let namespace = obj.kanidm_namespace();
207 let name = obj.kanidm_name();
208 self.kanidm_store.find(|k| {
209 kube::ResourceExt::namespace(k).as_ref() == Some(&namespace) && k.name_any() == name
210 })
211 }
212}
213
214#[allow(async_fn_in_trait)]
215pub trait BackoffContext<K: Resource> {
216 fn metrics(&self) -> &Arc<ControllerMetrics>;
217 async fn get_backoff(&self, obj_ref: ObjectRef<K>) -> Duration;
218 async fn reset_backoff(&self, obj_ref: ObjectRef<K>);
219}
220
221impl<K> BackoffContext<K> for Context<K>
222where
223 K: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
224 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
225{
226 fn metrics(&self) -> &Arc<ControllerMetrics> {
227 &self.metrics
228 }
229
230 async fn get_backoff(&self, obj_ref: ObjectRef<K>) -> Duration {
232 {
233 let read_guard = self.error_backoff_cache.read().await;
234 if let Some(backoff) = read_guard.get(&obj_ref) {
235 if let Some(duration) = backoff.write().await.next() {
236 return duration;
237 }
238 }
239 }
240
241 let mut backoff = ExponentialBuilder::default()
243 .with_max_delay(DEFAULT_RECONCILE_INTERVAL)
244 .without_max_times()
245 .build();
246 let duration = backoff.next().unwrap();
248 self.error_backoff_cache
249 .write()
250 .await
251 .insert(obj_ref.clone(), RwLock::new(backoff));
252 trace!(
253 msg = format!("recreate backoff policy"),
254 namespace = obj_ref.namespace.as_deref().unwrap(),
255 name = obj_ref.name,
256 );
257 duration
258 }
259
260 async fn reset_backoff(&self, obj_ref: ObjectRef<K>) {
262 let read_guard = self.error_backoff_cache.read().await;
263 if read_guard.get(&obj_ref).is_some() {
264 drop(read_guard);
265 trace!(
266 msg = "reset backoff policy",
267 namespace = obj_ref.namespace.as_deref().unwrap(),
268 name = obj_ref.name
269 );
270 self.error_backoff_cache.write().await.remove(&obj_ref);
271 }
272 }
273}
274
275#[allow(async_fn_in_trait)]
276pub trait IdmClientContext<K: Resource> {
277 async fn get_idm_client(&self, obj: &K) -> Result<Arc<KanidmClient>>;
278}
279
280impl<K> IdmClientContext<K> for Context<K>
281where
282 K: Resource<DynamicType = ()> + ResourceExt + KanidmResource + Lookup + Clone + 'static,
283 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
284{
285 async fn get_idm_client(&self, obj: &K) -> Result<Arc<KanidmClient>> {
286 self.get_kanidm_client(obj, KanidmUser::IdmAdmin).await
287 }
288}
289
290#[allow(async_fn_in_trait)]
291pub trait SystemClientContext<K: Resource> {
292 async fn get_system_client(&self, obj: &K) -> Result<Arc<KanidmClient>>;
293}
294
295impl<K> SystemClientContext<K> for Context<K>
296where
297 K: Resource<DynamicType = ()> + ResourceExt + KanidmResource + Lookup + Clone + 'static,
298 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
299{
300 async fn get_system_client(&self, obj: &K) -> Result<Arc<KanidmClient>> {
301 self.get_kanidm_client(obj, KanidmUser::Admin).await
302 }
303}
304
305#[allow(async_fn_in_trait)]
306pub trait KubeOperations<T, K>
307where
308 T: Resource + ResourceExt + Lookup + Clone + 'static,
309 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
310 K: Resource<Scope = NamespaceResourceScope>
311 + Serialize
312 + Clone
313 + std::fmt::Debug
314 + for<'de> Deserialize<'de>,
315 <K as kube::Resource>::DynamicType: Default,
316 <K as Resource>::Scope: std::marker::Sized,
317{
318 async fn kube_delete(&self, client: Client, metrics: &ControllerMetrics, obj: &K)
319 -> Result<()>;
320 async fn kube_patch(
321 &self,
322 client: Client,
323 metrics: &ControllerMetrics,
324 obj: K,
325 operator_name: &str,
326 ) -> Result<K>;
327}
328
329impl<T, K> KubeOperations<T, K> for T
330where
331 T: Resource + ResourceExt + Lookup + Clone + 'static,
332 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
333 K: Resource<Scope = NamespaceResourceScope>
334 + Serialize
335 + Clone
336 + std::fmt::Debug
337 + for<'de> Deserialize<'de>,
338 <K as kube::Resource>::DynamicType: Default,
339 <K as Resource>::Scope: std::marker::Sized,
340{
341 async fn kube_delete(
342 &self,
343 client: Client,
344 _metrics: &ControllerMetrics,
345 obj: &K,
346 ) -> Result<()> {
347 let name = obj.name_any();
348 let namespace = kube::ResourceExt::namespace(self).unwrap();
350 trace!(
351 msg = format!("deleting {}", short_type_name::<K>().unwrap_or("Unknown")),
352 resource.name = &name,
353 resource.namespace = &namespace
354 );
355 let api = Api::<K>::namespaced(client, &namespace);
356 api.delete(&name, &Default::default()).await.map_err(|e| {
357 Error::KubeError(
358 format!(
359 "failed to delete {} {namespace}/{name}",
360 short_type_name::<K>().unwrap_or("Unknown")
361 ),
362 Box::new(e),
363 )
364 })?;
365 Ok(())
366 }
367
368 async fn kube_patch(
369 &self,
370 client: Client,
371 metrics: &ControllerMetrics,
372 obj: K,
373 operator_name: &str,
374 ) -> Result<K> {
375 let name = obj.name_any();
376 let namespace = kube::ResourceExt::namespace(self).unwrap();
378 trace!(
379 msg = format!("patching {}", short_type_name::<K>().unwrap_or("Unknown")),
380 resource.name = &name,
381 resource.namespace = &namespace
382 );
383 let resource_api = Api::<K>::namespaced(client.clone(), &namespace);
384
385 let result = resource_api
386 .patch(
387 &name,
388 &PatchParams::apply(operator_name).force(),
389 &Patch::Apply(&obj),
390 )
391 .await;
392 match result {
393 Ok(resource) => Ok(resource),
394 Err(e) => match e {
395 kube::Error::Api(ae) if ae.code == 422 => {
396 info!(
397 msg = format!(
398 "recreating {} because the update operation was not possible",
399 short_type_name::<K>().unwrap_or("Unknown")
400 ),
401 reason = ae.reason
402 );
403 trace!(msg = "operation was not possible because of 422", ?ae);
404 self.kube_delete(client.clone(), metrics, &obj).await?;
405 metrics.reconcile_deploy_delete_create_inc();
406 resource_api
407 .patch(
408 &name,
409 &PatchParams::apply(operator_name).force(),
410 &Patch::Apply(&obj),
411 )
412 .await
413 .map_err(|e| {
414 Error::KubeError(
415 format!(
416 "failed to re-try patch {} {namespace}/{name}",
417 short_type_name::<K>().unwrap_or("Unknown")
418 ),
419 Box::new(e),
420 )
421 })
422 }
423 _ => Err(Error::KubeError(
424 format!(
425 "failed to patch {} {namespace}/{name}",
426 short_type_name::<K>().unwrap_or("Unknown")
427 ),
428 Box::new(e),
429 )),
430 },
431 }
432 }
433}