kaniop_operator/controller/
mod.rs

1pub mod context;
2pub mod kanidm;
3
4use self::{context::Context, kanidm::KanidmClients};
5
6use crate::kanidm::crd::Kanidm;
7use crate::metrics;
8use kaniop_k8s_util::error::{Error, Result};
9
10use kaniop_k8s_util::types::short_type_name;
11
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use futures::channel::mpsc;
16use futures::future::BoxFuture;
17use futures::{FutureExt, StreamExt};
18use k8s_openapi::api::core::v1::Namespace;
19use kube::Resource;
20use kube::api::{Api, ListParams, PartialObjectMeta, ResourceExt};
21use kube::client::Client;
22use kube::runtime::controller::Action;
23use kube::runtime::events::Recorder;
24use kube::runtime::reflector::store::Writer;
25use kube::runtime::reflector::{self, Lookup, ReflectHandle, Store};
26use kube::runtime::{WatchStreamExt, metadata_watcher, watcher};
27use prometheus_client::registry::Registry;
28use serde::de::DeserializeOwned;
29use tokio::sync::RwLock;
30use tokio::time::Duration;
31use tracing::{debug, error, trace};
32
33pub const DEFAULT_RECONCILE_INTERVAL: Duration = Duration::from_secs(60);
34pub const SUBSCRIBE_BUFFER_SIZE: usize = 256;
35pub const RELOAD_BUFFER_SIZE: usize = 16;
36pub const NAME_LABEL: &str = "app.kubernetes.io/name";
37pub const INSTANCE_LABEL: &str = "app.kubernetes.io/instance";
38pub const MANAGED_BY_LABEL: &str = "app.kubernetes.io/managed-by";
39
40pub type ControllerId = &'static str;
41
42/// State shared between the controller and the web server
43// Kanidm defined as a generic because it causes a cycle dependency with the kaniop_kanidm crate
44#[derive(Clone)]
45pub struct State {
46    /// Metrics
47    metrics: Arc<metrics::Metrics>,
48    /// Shared Kanidm cache clients with the ability to manage users and their groups
49    idm_clients: Arc<RwLock<KanidmClients>>,
50    /// Shared Kanidm cache clients with the ability to manage the operation of Kanidm as a
51    /// database and service
52    system_clients: Arc<RwLock<KanidmClients>>,
53    /// Cache for Namespace resources
54    pub namespace_store: Store<Namespace>,
55    /// Cache for Kanidm resources
56    pub kanidm_store: Store<Kanidm>,
57    /// Kubernetes client
58    pub client: Option<Client>,
59}
60
61/// Shared state for a resource stream
62pub struct ResourceReflector<K>
63where
64    K: Resource + Lookup + Clone + 'static,
65    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
66{
67    pub store: Store<K>,
68    pub writer: Writer<K>,
69    pub subscriber: ReflectHandle<K>,
70}
71
72/// State wrapper around the controller outputs for the web server
73impl State {
74    pub fn new(
75        registry: Registry,
76        controller_names: &[&'static str],
77        namespace_store: Store<Namespace>,
78        kanidm_store: Store<Kanidm>,
79        client: Option<Client>,
80    ) -> Self {
81        Self {
82            metrics: Arc::new(metrics::Metrics::new(registry, controller_names)),
83            idm_clients: Arc::default(),
84            system_clients: Arc::default(),
85            namespace_store,
86            kanidm_store,
87            client,
88        }
89    }
90
91    /// Metrics getter
92    pub fn metrics(&self) -> Result<String> {
93        let mut buffer = String::new();
94        let registry = &*self.metrics.registry;
95        prometheus_client::encoding::text::encode(&mut buffer, registry)
96            .map_err(|e| Error::FormattingError("failed to encode metrics".to_string(), e))?;
97        Ok(buffer)
98    }
99
100    /// Create a Controller Context that can update State
101    pub fn to_context<K>(&self, client: Client, controller_id: ControllerId) -> Context<K>
102    where
103        K: Resource + Lookup + Clone + 'static,
104        <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
105    {
106        Context::new(
107            controller_id,
108            client.clone(),
109            self.metrics
110                .controllers
111                .get(controller_id)
112                .expect("all CONTROLLER_IDs have to be registered")
113                .clone(),
114            Recorder::new(client.clone(), controller_id.into()),
115            self.idm_clients.clone(),
116            self.system_clients.clone(),
117            self.namespace_store.clone(),
118            self.kanidm_store.clone(),
119        )
120    }
121}
122
123pub async fn check_api_queryable<K>(client: Client) -> Api<K>
124where
125    K: Resource + Clone + DeserializeOwned + Debug,
126    <K as Resource>::DynamicType: Default,
127{
128    let api = Api::<K>::all(client.clone());
129    if let Err(e) = api.list(&ListParams::default().limit(1)).await {
130        error!(
131            "{} is not queryable; {e:?}. Check controller permissions",
132            short_type_name::<K>().unwrap_or("Unknown resource"),
133        );
134        std::process::exit(1);
135    }
136    api
137}
138
139pub fn create_subscriber<K>(buffer_size: usize) -> ResourceReflector<K>
140where
141    K: Resource + Lookup + Clone + 'static,
142    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
143    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
144{
145    let (store, writer) = reflector::store_shared::<K>(buffer_size);
146    let subscriber = writer
147        .subscribe()
148        .expect("subscribers can only be created from shared stores");
149
150    ResourceReflector {
151        store,
152        writer,
153        subscriber,
154    }
155}
156
157fn create_generic_watcher<K, W, T, S, StreamT>(
158    api: Api<K>,
159    writer: Writer<W>,
160    reload_tx: mpsc::Sender<()>,
161    controller_id: ControllerId,
162    ctx: Arc<Context<T>>,
163    stream_fn: S,
164) -> BoxFuture<'static, ()>
165where
166    K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
167    W: Resource + ResourceExt + Lookup + Clone + Debug + Send + Sync + 'static,
168    <W as Resource>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
169    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
170    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
171    <W as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
172    T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
173    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
174    S: Fn(Api<K>, watcher::Config) -> StreamT + 'static,
175    StreamT: futures::Stream<Item = Result<watcher::Event<W>, kube::runtime::watcher::Error>>
176        + Send
177        + 'static,
178{
179    let resource_name = short_type_name::<K>().unwrap_or("Unknown");
180
181    stream_fn(
182        api,
183        watcher::Config::default().labels(&format!("{MANAGED_BY_LABEL}=kaniop-{controller_id}")),
184    )
185    .default_backoff()
186    .reflect_shared(writer)
187    .for_each(move |res| {
188        let mut reload_tx_clone = reload_tx.clone();
189        let ctx = ctx.clone();
190        async move {
191            match res {
192                Ok(event) => {
193                    trace!(msg = "watched event", ?event);
194                    match event {
195                        watcher::Event::Delete(d) => {
196                            debug!(
197                                msg = format!("delete event for {resource_name} trigger reconcile"),
198                                namespace = ResourceExt::namespace(&d).unwrap(),
199                                name = d.name_any()
200                            );
201
202                            // TODO: remove for each trigger on delete logic when
203                            // (dispatch delete events issue)[https://github.com/kube-rs/kube/issues/1590]
204                            // is solved
205                            let _ignore_errors = reload_tx_clone.try_send(()).map_err(
206                                |e| error!(msg = "failed to trigger reconcile on delete", %e),
207                            );
208                            ctx.metrics
209                                .triggered_inc(metrics::Action::Delete, resource_name);
210                        }
211                        watcher::Event::Apply(d) => {
212                            debug!(
213                                msg = format!("apply event for {resource_name} trigger reconcile"),
214                                namespace = ResourceExt::namespace(&d).unwrap(),
215                                name = d.name_any()
216                            );
217                            ctx.metrics
218                                .triggered_inc(metrics::Action::Apply, resource_name);
219                        }
220                        _ => {}
221                    }
222                }
223                Err(e) => {
224                    error!(msg = format!("unexpected error when watching {resource_name}"), %e);
225                    ctx.metrics.watch_operations_failed_inc();
226                }
227            }
228        }
229    })
230    .boxed()
231}
232
233pub fn create_watcher<K, T>(
234    api: Api<K>,
235    writer: Writer<K>,
236    reload_tx: mpsc::Sender<()>,
237    controller_id: ControllerId,
238    ctx: Arc<Context<T>>,
239) -> BoxFuture<'static, ()>
240where
241    K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
242    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
243    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
244    T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
245    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
246{
247    create_generic_watcher::<K, K, T, _, _>(api, writer, reload_tx, controller_id, ctx, watcher)
248}
249
250pub fn create_metadata_watcher<K, T>(
251    api: Api<K>,
252    writer: Writer<PartialObjectMeta<K>>,
253    reload_tx: mpsc::Sender<()>,
254    controller_id: ControllerId,
255    ctx: Arc<Context<T>>,
256) -> BoxFuture<'static, ()>
257where
258    K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
259    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
260    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
261    T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
262    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
263{
264    create_generic_watcher::<K, PartialObjectMeta<K>, T, _, _>(
265        api,
266        writer,
267        reload_tx,
268        controller_id,
269        ctx,
270        metadata_watcher,
271    )
272}
273
274pub fn error_policy<K>(_obj: Arc<K>, _error: &Error, _ctx: Arc<Context<K>>) -> Action
275where
276    K: Resource + Lookup + Clone + 'static,
277    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
278{
279    unreachable!("Handle in backoff_reconciler macro")
280}
281
282#[macro_export]
283macro_rules! backoff_reconciler {
284    ($inner_reconciler:ident) => {
285        |obj, ctx| async move {
286            use $crate::controller::context::BackoffContext;
287            match $inner_reconciler(obj.clone(), ctx.clone()).await {
288                Ok(action) => {
289                    ctx.reset_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
290                        .await;
291                    Ok(action)
292                }
293                Err(error) => {
294                    // safe unwrap: all resources in the operator are namespace scoped resources
295                    let namespace = kube::ResourceExt::namespace(obj.as_ref()).unwrap();
296                    let name = kube::ResourceExt::name_any(obj.as_ref());
297                    tracing::error!(msg = "failed reconciliation", %namespace, %name, %error);
298                    ctx.metrics().reconcile_failure_inc();
299                    let backoff_duration = ctx
300                        .get_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
301                        .await;
302                    tracing::trace!(
303                        msg = format!("backoff duration: {backoff_duration:?}"),
304                        %namespace,
305                        %name,
306                    );
307                    Ok(kube::runtime::controller::Action::requeue(backoff_duration))
308                }
309            }
310        }
311    };
312}