Skip to main content

kaniop_operator/controller/
mod.rs

1pub mod context;
2pub mod kanidm;
3
4use self::{context::Context, kanidm::KanidmClients};
5
6use crate::kanidm::crd::Kanidm;
7use crate::metrics;
8use crate::prometheus_exporter;
9
10use kaniop_k8s_util::error::{Error, Result};
11
12use kaniop_k8s_util::types::short_type_name;
13
14use std::fmt::Debug;
15use std::sync::Arc;
16
17use futures::channel::mpsc;
18use futures::future::BoxFuture;
19use futures::{FutureExt, StreamExt};
20use k8s_openapi::api::core::v1::Namespace;
21use kube::Resource;
22use kube::api::{Api, ListParams, PartialObjectMeta, ResourceExt};
23use kube::client::Client;
24use kube::runtime::controller::Action;
25use kube::runtime::events::Recorder;
26use kube::runtime::reflector::store::Writer;
27use kube::runtime::reflector::{self, Lookup, ReflectHandle, Store};
28use kube::runtime::{WatchStreamExt, metadata_watcher, watcher};
29use serde::de::DeserializeOwned;
30use tokio::sync::RwLock;
31use tokio::time::Duration;
32use tracing::{debug, error, trace};
33
34pub const DEFAULT_RECONCILE_INTERVAL: Duration = Duration::from_secs(60);
35pub const SUBSCRIBE_BUFFER_SIZE: usize = 256;
36pub const RELOAD_BUFFER_SIZE: usize = 16;
37pub const NAME_LABEL: &str = "app.kubernetes.io/name";
38pub const INSTANCE_LABEL: &str = "app.kubernetes.io/instance";
39pub const MANAGED_BY_LABEL: &str = "app.kubernetes.io/managed-by";
40
41pub type ControllerId = &'static str;
42
43/// State shared between the controller and the web server
44// Kanidm defined as a generic because it causes a cycle dependency with the kaniop_kanidm crate
45#[derive(Clone)]
46pub struct State {
47    /// Metrics
48    metrics: Arc<metrics::Metrics>,
49    /// Shared Kanidm cache clients with the ability to manage users and their groups
50    idm_clients: Arc<RwLock<KanidmClients>>,
51    /// Shared Kanidm cache clients with the ability to manage the operation of Kanidm as a
52    /// database and service
53    system_clients: Arc<RwLock<KanidmClients>>,
54    /// Cache for Namespace resources
55    pub namespace_store: Store<Namespace>,
56    /// Cache for Kanidm resources
57    pub kanidm_store: Store<Kanidm>,
58    /// Kubernetes client
59    pub client: Option<Client>,
60}
61
62/// Shared state for a resource stream
63pub struct ResourceReflector<K>
64where
65    K: Resource + Lookup + Clone + 'static,
66    <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
67{
68    pub store: Store<K>,
69    pub writer: Writer<K>,
70    pub subscriber: ReflectHandle<K>,
71}
72
73/// State wrapper around the controller outputs for the web server
74impl State {
75    pub fn new(
76        metrics: metrics::Metrics,
77        namespace_store: Store<Namespace>,
78        kanidm_store: Store<Kanidm>,
79        client: Option<Client>,
80    ) -> Self {
81        Self {
82            metrics: Arc::new(metrics),
83            idm_clients: Arc::default(),
84            system_clients: Arc::default(),
85            namespace_store,
86            kanidm_store,
87            client,
88        }
89    }
90
91    /// Metrics getter
92    pub fn metrics(&self) -> Result<String> {
93        prometheus_exporter::format_prometheus_metrics("kaniop").map_err(|e| {
94            Error::FormattingError(format!("failed to export metrics: {}", e), std::fmt::Error)
95        })
96    }
97
98    /// Create a Controller Context that can update State
99    pub fn to_context<K>(&self, client: Client, controller_id: ControllerId) -> Context<K>
100    where
101        K: Resource + Lookup + Clone + 'static,
102        <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
103    {
104        Context::new(
105            controller_id,
106            client.clone(),
107            self.metrics
108                .controllers
109                .get(controller_id)
110                .expect("all CONTROLLER_IDs have to be registered")
111                .clone(),
112            Recorder::new(client.clone(), controller_id.into()),
113            self.idm_clients.clone(),
114            self.system_clients.clone(),
115            self.namespace_store.clone(),
116            self.kanidm_store.clone(),
117        )
118    }
119}
120
121pub async fn check_api_queryable<K>(client: Client) -> Api<K>
122where
123    K: Resource + Clone + DeserializeOwned + Debug,
124    <K as Resource>::DynamicType: Default,
125{
126    let api = Api::<K>::all(client.clone());
127    if let Err(e) = api.list(&ListParams::default().limit(1)).await {
128        error!(
129            "{} is not queryable; {e:?}. Check controller permissions",
130            short_type_name::<K>().unwrap_or("Unknown resource"),
131        );
132        std::process::exit(1);
133    }
134    api
135}
136
137pub fn create_subscriber<K>(buffer_size: usize) -> ResourceReflector<K>
138where
139    K: Resource + Lookup + Clone + 'static,
140    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
141    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
142{
143    let (store, writer) = reflector::store_shared::<K>(buffer_size);
144    let subscriber = writer
145        .subscribe()
146        .expect("subscribers can only be created from shared stores");
147
148    ResourceReflector {
149        store,
150        writer,
151        subscriber,
152    }
153}
154
155fn create_generic_watcher<K, W, T, S, StreamT>(
156    api: Api<K>,
157    writer: Writer<W>,
158    reload_tx: mpsc::Sender<()>,
159    controller_id: ControllerId,
160    ctx: Arc<Context<T>>,
161    stream_fn: S,
162) -> BoxFuture<'static, ()>
163where
164    K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
165    W: Resource + ResourceExt + Lookup + Clone + Debug + Send + Sync + 'static,
166    <W as Resource>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
167    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
168    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
169    <W as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
170    T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
171    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
172    S: Fn(Api<K>, watcher::Config) -> StreamT + 'static,
173    StreamT: futures::Stream<Item = Result<watcher::Event<W>, kube::runtime::watcher::Error>>
174        + Send
175        + 'static,
176{
177    let resource_name = short_type_name::<K>().unwrap_or("Unknown");
178
179    stream_fn(
180        api,
181        watcher::Config::default().labels(&format!("{MANAGED_BY_LABEL}=kaniop-{controller_id}")),
182    )
183    .default_backoff()
184    .reflect_shared(writer)
185    .for_each(move |res| {
186        let mut reload_tx_clone = reload_tx.clone();
187        let ctx = ctx.clone();
188        async move {
189            match res {
190                Ok(event) => {
191                    trace!(msg = "watched event", ?event);
192                    match event {
193                        watcher::Event::Delete(d) => {
194                            debug!(
195                                msg = format!("delete event for {resource_name} trigger reconcile"),
196                                namespace = ResourceExt::namespace(&d).unwrap(),
197                                name = d.name_any()
198                            );
199
200                            // TODO: remove for each trigger on delete logic when
201                            // (dispatch delete events issue)[https://github.com/kube-rs/kube/issues/1590]
202                            // is solved
203                            let _ignore_errors = reload_tx_clone.try_send(()).map_err(
204                                |e| error!(msg = "failed to trigger reconcile on delete", %e),
205                            );
206                            ctx.metrics
207                                .triggered_inc(metrics::Action::Delete, resource_name);
208                        }
209                        watcher::Event::Apply(d) => {
210                            debug!(
211                                msg = format!("apply event for {resource_name} trigger reconcile"),
212                                namespace = ResourceExt::namespace(&d).unwrap(),
213                                name = d.name_any()
214                            );
215                            ctx.metrics
216                                .triggered_inc(metrics::Action::Apply, resource_name);
217                        }
218                        _ => {}
219                    }
220                }
221                Err(e) => {
222                    error!(msg = format!("unexpected error when watching {resource_name}"), %e);
223                    ctx.metrics.watch_operations_failed_inc();
224                }
225            }
226        }
227    })
228    .boxed()
229}
230
231pub fn create_watcher<K, T>(
232    api: Api<K>,
233    writer: Writer<K>,
234    reload_tx: mpsc::Sender<()>,
235    controller_id: ControllerId,
236    ctx: Arc<Context<T>>,
237) -> BoxFuture<'static, ()>
238where
239    K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
240    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
241    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
242    T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
243    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
244{
245    create_generic_watcher::<K, K, T, _, _>(api, writer, reload_tx, controller_id, ctx, watcher)
246}
247
248pub fn create_metadata_watcher<K, T>(
249    api: Api<K>,
250    writer: Writer<PartialObjectMeta<K>>,
251    reload_tx: mpsc::Sender<()>,
252    controller_id: ControllerId,
253    ctx: Arc<Context<T>>,
254) -> BoxFuture<'static, ()>
255where
256    K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
257    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
258    <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
259    T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
260    <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
261{
262    create_generic_watcher::<K, PartialObjectMeta<K>, T, _, _>(
263        api,
264        writer,
265        reload_tx,
266        controller_id,
267        ctx,
268        metadata_watcher,
269    )
270}
271
272pub fn error_policy<K>(_obj: Arc<K>, _error: &Error, _ctx: Arc<Context<K>>) -> Action
273where
274    K: Resource + Lookup + Clone + 'static,
275    <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
276{
277    unreachable!("Handle in backoff_reconciler macro")
278}
279
280#[macro_export]
281macro_rules! backoff_reconciler {
282    ($inner_reconciler:ident) => {
283        |obj, ctx| async move {
284            use $crate::controller::context::BackoffContext;
285            match $inner_reconciler(obj.clone(), ctx.clone()).await {
286                Ok(action) => {
287                    ctx.reset_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
288                        .await;
289                    Ok(action)
290                }
291                Err(error) => {
292                    // safe unwrap: all resources in the operator are namespace scoped resources
293                    let namespace = kube::ResourceExt::namespace(obj.as_ref()).unwrap();
294                    let name = kube::ResourceExt::name_any(obj.as_ref());
295                    tracing::error!(msg = "failed reconciliation", %namespace, %name, %error);
296                    ctx.metrics().reconcile_failure_inc();
297                    let backoff_duration = ctx
298                        .get_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
299                        .await;
300                    tracing::trace!(
301                        msg = format!("backoff duration: {backoff_duration:?}"),
302                        %namespace,
303                        %name,
304                    );
305                    Ok(kube::runtime::controller::Action::requeue(backoff_duration))
306                }
307            }
308        }
309    };
310}