kaniop_operator/controller/
context.rs

1use super::{
2    ControllerId, DEFAULT_RECONCILE_INTERVAL, KanidmClients,
3    kanidm::{ClientLockKey, KanidmKey, KanidmResource, KanidmUser},
4};
5
6use crate::kanidm::crd::Kanidm;
7use crate::metrics::ControllerMetrics;
8use kaniop_k8s_util::error::{Error, Result};
9
10use kaniop_k8s_util::types::short_type_name;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
16use k8s_openapi::{NamespaceResourceScope, api::core::v1::Namespace};
17use kanidm_client::KanidmClient;
18use kube::runtime::events::{Event, EventType, Recorder};
19use kube::{Api, client::Client};
20use kube::{Resource, ResourceExt};
21use kube::{
22    api::{Patch, PatchParams},
23    runtime::reflector::{Lookup, ObjectRef, Store},
24};
25use serde::{Deserialize, Serialize};
26use tokio::sync::{Mutex, RwLock};
27use tokio::time::Duration;
28use tracing::{debug, error, info, trace};
29
30// Context for our reconciler
31#[derive(Clone)]
32pub struct Context<K: Resource> {
33    /// Controller ID
34    pub controller_id: ControllerId,
35    /// Kubernetes client
36    pub client: Client,
37    /// Prometheus metrics
38    pub metrics: Arc<ControllerMetrics>,
39    /// State of the error backoff policy per object
40    error_backoff_cache: Arc<RwLock<HashMap<ObjectRef<K>, RwLock<ExponentialBackoff>>>>,
41    /// Event recorder
42    pub recorder: Recorder,
43    /// Cache for Namespace resources
44    pub namespace_store: Store<Namespace>,
45    /// Cache for Kanidm resources
46    pub kanidm_store: Store<Kanidm>,
47    /// Shared Kanidm cache clients with the ability to manage users and their groups
48    idm_clients: Arc<RwLock<KanidmClients>>,
49    /// Shared Kanidm cache clients with the ability to manage the operation of Kanidm as a
50    /// database and service
51    system_clients: Arc<RwLock<KanidmClients>>,
52    /// Locks for client creation to prevent thundering herd problem
53    client_creation_locks: Arc<RwLock<HashMap<ClientLockKey, Arc<Mutex<()>>>>>,
54}
55
56impl<K> Context<K>
57where
58    K: Resource + ResourceExt + Lookup + Clone + 'static,
59    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
60{
61    #[allow(clippy::too_many_arguments)]
62    pub fn new(
63        controller_id: ControllerId,
64        client: Client,
65        metrics: Arc<ControllerMetrics>,
66        recorder: Recorder,
67        idm_clients: Arc<RwLock<KanidmClients>>,
68        system_clients: Arc<RwLock<KanidmClients>>,
69        namespace_store: Store<Namespace>,
70        kanidm_store: Store<Kanidm>,
71    ) -> Self {
72        Self {
73            controller_id,
74            client,
75            metrics,
76            recorder,
77            namespace_store,
78            kanidm_store,
79            idm_clients,
80            system_clients,
81            error_backoff_cache: Arc::default(),
82            client_creation_locks: Arc::default(),
83        }
84    }
85
86    pub async fn release_kanidm_clients(&self, kanidm: &Kanidm) {
87        let key = KanidmKey {
88            // safe unwrap: Kanidm is namespaced scoped
89            namespace: kube::ResourceExt::namespace(kanidm).unwrap(),
90            name: kanidm.name_any(),
91        };
92
93        self.idm_clients.write().await.remove(&key);
94        self.system_clients.write().await.remove(&key);
95        {
96            let mut locks = self.client_creation_locks.write().await;
97            locks.remove(&ClientLockKey {
98                namespace: key.namespace.clone(),
99                name: key.name.clone(),
100                user: KanidmUser::IdmAdmin,
101            });
102            locks.remove(&ClientLockKey {
103                namespace: key.namespace.clone(),
104                name: key.name.clone(),
105                user: KanidmUser::Admin,
106            });
107        }
108    }
109}
110
111impl<K> Context<K>
112where
113    K: Resource<DynamicType = ()> + ResourceExt + KanidmResource + Lookup + Clone + 'static,
114    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
115{
116    /// Check if a valid client exists in cache
117    async fn get_valid_cached_client(
118        cache: &Arc<RwLock<KanidmClients>>,
119        key: &KanidmKey,
120        namespace: &str,
121        name: &str,
122    ) -> Option<Arc<KanidmClient>> {
123        let client = cache.read().await.get(key).cloned()?;
124
125        trace!(
126            msg = "check existing Kanidm client session",
127            namespace, name
128        );
129        if client.auth_valid().await.is_ok() {
130            trace!(msg = "reuse Kanidm client session", namespace, name);
131            Some(client)
132        } else {
133            None
134        }
135    }
136
137    /// Return a valid client for the Kanidm cluster. This operation require to do at least a
138    /// request for validating the client, use it wisely.
139    async fn get_kanidm_client(&self, obj: &K, user: KanidmUser) -> Result<Arc<KanidmClient>> {
140        let namespace = obj.kanidm_namespace();
141        let name = obj.kanidm_name();
142        debug!(msg = "get Kanidm client", namespace, name);
143
144        let cache = match user {
145            KanidmUser::Admin => self.system_clients.clone(),
146            KanidmUser::IdmAdmin => self.idm_clients.clone(),
147        };
148        let key = KanidmKey {
149            namespace: namespace.clone(),
150            name: name.clone(),
151        };
152        if let Some(client) = Self::get_valid_cached_client(&cache, &key, &namespace, &name).await {
153            return Ok(client);
154        }
155
156        // Slow path: acquire lock for this specific client to prevent concurrent creation
157        let creation_lock = self
158            .client_creation_locks
159            .write()
160            .await
161            .entry(ClientLockKey {
162                namespace: namespace.clone(),
163                name: name.clone(),
164                user: user.clone(),
165            })
166            .or_insert_with(Arc::default)
167            .clone();
168        let _guard = creation_lock.lock().await;
169
170        // Double-check: another task may have created the client while we waited for the lock
171        if let Some(client) = Self::get_valid_cached_client(&cache, &key, &namespace, &name).await {
172            return Ok(client);
173        }
174
175        match KanidmClients::create_client(&namespace, &name, user, self.client.clone()).await {
176            Ok(client) => {
177                cache.write().await.insert(key.clone(), client.clone());
178                Ok(client)
179            }
180            Err(e) => {
181                self.recorder
182                    .publish(
183                        &Event {
184                            type_: EventType::Warning,
185                            reason: "KanidmClientError".to_string(),
186                            note: Some(e.to_string()),
187                            action: "KanidmClientCreating".into(),
188                            secondary: None,
189                        },
190                        &obj.object_ref(&()),
191                    )
192                    .await
193                    .map_err(|e| {
194                        error!(msg = "failed to create Kanidm client", %e);
195                        Error::KubeError("failed to publish event".to_string(), Box::new(e))
196                    })?;
197                Err(e)
198            }
199        }
200    }
201
202    /// Return [`Kanidm`] of the given object
203    ///
204    /// [`Kanidm`]: struct.Kanidm.html
205    pub fn get_kanidm(&self, obj: &K) -> Option<Arc<Kanidm>> {
206        let namespace = obj.kanidm_namespace();
207        let name = obj.kanidm_name();
208        self.kanidm_store.find(|k| {
209            kube::ResourceExt::namespace(k).as_ref() == Some(&namespace) && k.name_any() == name
210        })
211    }
212}
213
214#[allow(async_fn_in_trait)]
215pub trait BackoffContext<K: Resource> {
216    fn metrics(&self) -> &Arc<ControllerMetrics>;
217    async fn get_backoff(&self, obj_ref: ObjectRef<K>) -> Duration;
218    async fn reset_backoff(&self, obj_ref: ObjectRef<K>);
219}
220
221impl<K> BackoffContext<K> for Context<K>
222where
223    K: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
224    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
225{
226    fn metrics(&self) -> &Arc<ControllerMetrics> {
227        &self.metrics
228    }
229
230    /// Return next duration of the backoff policy for the given object
231    async fn get_backoff(&self, obj_ref: ObjectRef<K>) -> Duration {
232        {
233            let read_guard = self.error_backoff_cache.read().await;
234            if let Some(backoff) = read_guard.get(&obj_ref) {
235                if let Some(duration) = backoff.write().await.next() {
236                    return duration;
237                }
238            }
239        }
240
241        // Backoff policy: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s...
242        let mut backoff = ExponentialBuilder::default()
243            .with_max_delay(DEFAULT_RECONCILE_INTERVAL)
244            .without_max_times()
245            .build();
246        // safe unwrap: first backoff is always Some(Duration)
247        let duration = backoff.next().unwrap();
248        self.error_backoff_cache
249            .write()
250            .await
251            .insert(obj_ref.clone(), RwLock::new(backoff));
252        trace!(
253            msg = format!("recreate backoff policy"),
254            namespace = obj_ref.namespace.as_deref().unwrap(),
255            name = obj_ref.name,
256        );
257        duration
258    }
259
260    /// Reset the backoff policy for the given object
261    async fn reset_backoff(&self, obj_ref: ObjectRef<K>) {
262        let read_guard = self.error_backoff_cache.read().await;
263        if read_guard.get(&obj_ref).is_some() {
264            drop(read_guard);
265            trace!(
266                msg = "reset backoff policy",
267                namespace = obj_ref.namespace.as_deref().unwrap(),
268                name = obj_ref.name
269            );
270            self.error_backoff_cache.write().await.remove(&obj_ref);
271        }
272    }
273}
274
275#[allow(async_fn_in_trait)]
276pub trait IdmClientContext<K: Resource> {
277    async fn get_idm_client(&self, obj: &K) -> Result<Arc<KanidmClient>>;
278}
279
280impl<K> IdmClientContext<K> for Context<K>
281where
282    K: Resource<DynamicType = ()> + ResourceExt + KanidmResource + Lookup + Clone + 'static,
283    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
284{
285    async fn get_idm_client(&self, obj: &K) -> Result<Arc<KanidmClient>> {
286        self.get_kanidm_client(obj, KanidmUser::IdmAdmin).await
287    }
288}
289
290#[allow(async_fn_in_trait)]
291pub trait SystemClientContext<K: Resource> {
292    async fn get_system_client(&self, obj: &K) -> Result<Arc<KanidmClient>>;
293}
294
295impl<K> SystemClientContext<K> for Context<K>
296where
297    K: Resource<DynamicType = ()> + ResourceExt + KanidmResource + Lookup + Clone + 'static,
298    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
299{
300    async fn get_system_client(&self, obj: &K) -> Result<Arc<KanidmClient>> {
301        self.get_kanidm_client(obj, KanidmUser::Admin).await
302    }
303}
304
305#[allow(async_fn_in_trait)]
306pub trait KubeOperations<T, K>
307where
308    T: Resource + ResourceExt + Lookup + Clone + 'static,
309    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
310    K: Resource<Scope = NamespaceResourceScope>
311        + Serialize
312        + Clone
313        + std::fmt::Debug
314        + for<'de> Deserialize<'de>,
315    <K as kube::Resource>::DynamicType: Default,
316    <K as Resource>::Scope: std::marker::Sized,
317{
318    async fn kube_delete(&self, client: Client, metrics: &ControllerMetrics, obj: &K)
319    -> Result<()>;
320    async fn kube_patch(
321        &self,
322        client: Client,
323        metrics: &ControllerMetrics,
324        obj: K,
325        operator_name: &str,
326    ) -> Result<K>;
327}
328
329impl<T, K> KubeOperations<T, K> for T
330where
331    T: Resource + ResourceExt + Lookup + Clone + 'static,
332    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
333    K: Resource<Scope = NamespaceResourceScope>
334        + Serialize
335        + Clone
336        + std::fmt::Debug
337        + for<'de> Deserialize<'de>,
338    <K as kube::Resource>::DynamicType: Default,
339    <K as Resource>::Scope: std::marker::Sized,
340{
341    async fn kube_delete(
342        &self,
343        client: Client,
344        _metrics: &ControllerMetrics,
345        obj: &K,
346    ) -> Result<()> {
347        let name = obj.name_any();
348        // safe unwrap: self is namespaced scoped
349        let namespace = kube::ResourceExt::namespace(self).unwrap();
350        trace!(
351            msg = format!("deleting {}", short_type_name::<K>().unwrap_or("Unknown")),
352            resource.name = &name,
353            resource.namespace = &namespace
354        );
355        let api = Api::<K>::namespaced(client, &namespace);
356        api.delete(&name, &Default::default()).await.map_err(|e| {
357            Error::KubeError(
358                format!(
359                    "failed to delete {} {namespace}/{name}",
360                    short_type_name::<K>().unwrap_or("Unknown")
361                ),
362                Box::new(e),
363            )
364        })?;
365        Ok(())
366    }
367
368    async fn kube_patch(
369        &self,
370        client: Client,
371        metrics: &ControllerMetrics,
372        obj: K,
373        operator_name: &str,
374    ) -> Result<K> {
375        let name = obj.name_any();
376        // safe unwrap: self is namespaced scoped
377        let namespace = kube::ResourceExt::namespace(self).unwrap();
378        trace!(
379            msg = format!("patching {}", short_type_name::<K>().unwrap_or("Unknown")),
380            resource.name = &name,
381            resource.namespace = &namespace
382        );
383        let resource_api = Api::<K>::namespaced(client.clone(), &namespace);
384
385        let result = resource_api
386            .patch(
387                &name,
388                &PatchParams::apply(operator_name).force(),
389                &Patch::Apply(&obj),
390            )
391            .await;
392        match result {
393            Ok(resource) => Ok(resource),
394            Err(e) => match e {
395                kube::Error::Api(ae) if ae.code == 422 => {
396                    info!(
397                        msg = format!(
398                            "recreating {} because the update operation was not possible",
399                            short_type_name::<K>().unwrap_or("Unknown")
400                        ),
401                        reason = ae.reason
402                    );
403                    trace!(msg = "operation was not possible because of 422", ?ae);
404                    self.kube_delete(client.clone(), metrics, &obj).await?;
405                    metrics.reconcile_deploy_delete_create_inc();
406                    resource_api
407                        .patch(
408                            &name,
409                            &PatchParams::apply(operator_name).force(),
410                            &Patch::Apply(&obj),
411                        )
412                        .await
413                        .map_err(|e| {
414                            Error::KubeError(
415                                format!(
416                                    "failed to re-try patch {} {namespace}/{name}",
417                                    short_type_name::<K>().unwrap_or("Unknown")
418                                ),
419                                Box::new(e),
420                            )
421                        })
422                }
423                _ => Err(Error::KubeError(
424                    format!(
425                        "failed to patch {} {namespace}/{name}",
426                        short_type_name::<K>().unwrap_or("Unknown")
427                    ),
428                    Box::new(e),
429                )),
430            },
431        }
432    }
433}