kaniop_operator/controller/
mod.rs1pub mod context;
2pub mod kanidm;
3
4use self::{context::Context, kanidm::KanidmClients};
5
6use crate::kanidm::crd::Kanidm;
7use crate::metrics;
8use crate::prometheus_exporter;
9
10use kaniop_k8s_util::error::{Error, Result};
11
12use kaniop_k8s_util::types::short_type_name;
13
14use std::fmt::Debug;
15use std::sync::Arc;
16
17use futures::channel::mpsc;
18use futures::future::BoxFuture;
19use futures::{FutureExt, StreamExt};
20use k8s_openapi::api::core::v1::Namespace;
21use kube::Resource;
22use kube::api::{Api, ListParams, PartialObjectMeta, ResourceExt};
23use kube::client::Client;
24use kube::runtime::controller::Action;
25use kube::runtime::events::Recorder;
26use kube::runtime::reflector::store::Writer;
27use kube::runtime::reflector::{self, Lookup, ReflectHandle, Store};
28use kube::runtime::{WatchStreamExt, metadata_watcher, watcher};
29use serde::de::DeserializeOwned;
30use tokio::sync::RwLock;
31use tokio::time::Duration;
32use tracing::{debug, error, trace};
33
34pub const DEFAULT_RECONCILE_INTERVAL: Duration = Duration::from_secs(60);
35pub const SUBSCRIBE_BUFFER_SIZE: usize = 256;
36pub const RELOAD_BUFFER_SIZE: usize = 16;
37pub const NAME_LABEL: &str = "app.kubernetes.io/name";
38pub const INSTANCE_LABEL: &str = "app.kubernetes.io/instance";
39pub const MANAGED_BY_LABEL: &str = "app.kubernetes.io/managed-by";
40
41pub type ControllerId = &'static str;
42
43#[derive(Clone)]
46pub struct State {
47 metrics: Arc<metrics::Metrics>,
49 idm_clients: Arc<RwLock<KanidmClients>>,
51 system_clients: Arc<RwLock<KanidmClients>>,
54 pub namespace_store: Store<Namespace>,
56 pub kanidm_store: Store<Kanidm>,
58 pub client: Option<Client>,
60}
61
62pub struct ResourceReflector<K>
64where
65 K: Resource + Lookup + Clone + 'static,
66 <K as Lookup>::DynamicType: Eq + std::hash::Hash + Clone,
67{
68 pub store: Store<K>,
69 pub writer: Writer<K>,
70 pub subscriber: ReflectHandle<K>,
71}
72
73impl State {
75 pub fn new(
76 metrics: metrics::Metrics,
77 namespace_store: Store<Namespace>,
78 kanidm_store: Store<Kanidm>,
79 client: Option<Client>,
80 ) -> Self {
81 Self {
82 metrics: Arc::new(metrics),
83 idm_clients: Arc::default(),
84 system_clients: Arc::default(),
85 namespace_store,
86 kanidm_store,
87 client,
88 }
89 }
90
91 pub fn metrics(&self) -> Result<String> {
93 prometheus_exporter::format_prometheus_metrics("kaniop").map_err(|e| {
94 Error::FormattingError(format!("failed to export metrics: {}", e), std::fmt::Error)
95 })
96 }
97
98 pub fn to_context<K>(&self, client: Client, controller_id: ControllerId) -> Context<K>
100 where
101 K: Resource + Lookup + Clone + 'static,
102 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
103 {
104 Context::new(
105 controller_id,
106 client.clone(),
107 self.metrics
108 .controllers
109 .get(controller_id)
110 .expect("all CONTROLLER_IDs have to be registered")
111 .clone(),
112 Recorder::new(client.clone(), controller_id.into()),
113 self.idm_clients.clone(),
114 self.system_clients.clone(),
115 self.namespace_store.clone(),
116 self.kanidm_store.clone(),
117 )
118 }
119}
120
121pub async fn check_api_queryable<K>(client: Client) -> Api<K>
122where
123 K: Resource + Clone + DeserializeOwned + Debug,
124 <K as Resource>::DynamicType: Default,
125{
126 let api = Api::<K>::all(client.clone());
127 if let Err(e) = api.list(&ListParams::default().limit(1)).await {
128 error!(
129 "{} is not queryable; {e:?}. Check controller permissions",
130 short_type_name::<K>().unwrap_or("Unknown resource"),
131 );
132 std::process::exit(1);
133 }
134 api
135}
136
137pub fn create_subscriber<K>(buffer_size: usize) -> ResourceReflector<K>
138where
139 K: Resource + Lookup + Clone + 'static,
140 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
141 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
142{
143 let (store, writer) = reflector::store_shared::<K>(buffer_size);
144 let subscriber = writer
145 .subscribe()
146 .expect("subscribers can only be created from shared stores");
147
148 ResourceReflector {
149 store,
150 writer,
151 subscriber,
152 }
153}
154
155fn create_generic_watcher<K, W, T, S, StreamT>(
156 api: Api<K>,
157 writer: Writer<W>,
158 reload_tx: mpsc::Sender<()>,
159 controller_id: ControllerId,
160 ctx: Arc<Context<T>>,
161 stream_fn: S,
162) -> BoxFuture<'static, ()>
163where
164 K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
165 W: Resource + ResourceExt + Lookup + Clone + Debug + Send + Sync + 'static,
166 <W as Resource>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
167 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
168 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
169 <W as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
170 T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
171 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
172 S: Fn(Api<K>, watcher::Config) -> StreamT + 'static,
173 StreamT: futures::Stream<Item = Result<watcher::Event<W>, kube::runtime::watcher::Error>>
174 + Send
175 + 'static,
176{
177 let resource_name = short_type_name::<K>().unwrap_or("Unknown");
178
179 stream_fn(
180 api,
181 watcher::Config::default().labels(&format!("{MANAGED_BY_LABEL}=kaniop-{controller_id}")),
182 )
183 .default_backoff()
184 .reflect_shared(writer)
185 .for_each(move |res| {
186 let mut reload_tx_clone = reload_tx.clone();
187 let ctx = ctx.clone();
188 async move {
189 match res {
190 Ok(event) => {
191 trace!(msg = "watched event", ?event);
192 match event {
193 watcher::Event::Delete(d) => {
194 debug!(
195 msg = format!("delete event for {resource_name} trigger reconcile"),
196 namespace = ResourceExt::namespace(&d).unwrap(),
197 name = d.name_any()
198 );
199
200 let _ignore_errors = reload_tx_clone.try_send(()).map_err(
204 |e| error!(msg = "failed to trigger reconcile on delete", %e),
205 );
206 ctx.metrics
207 .triggered_inc(metrics::Action::Delete, resource_name);
208 }
209 watcher::Event::Apply(d) => {
210 debug!(
211 msg = format!("apply event for {resource_name} trigger reconcile"),
212 namespace = ResourceExt::namespace(&d).unwrap(),
213 name = d.name_any()
214 );
215 ctx.metrics
216 .triggered_inc(metrics::Action::Apply, resource_name);
217 }
218 _ => {}
219 }
220 }
221 Err(e) => {
222 error!(msg = format!("unexpected error when watching {resource_name}"), %e);
223 ctx.metrics.watch_operations_failed_inc();
224 }
225 }
226 }
227 })
228 .boxed()
229}
230
231pub fn create_watcher<K, T>(
232 api: Api<K>,
233 writer: Writer<K>,
234 reload_tx: mpsc::Sender<()>,
235 controller_id: ControllerId,
236 ctx: Arc<Context<T>>,
237) -> BoxFuture<'static, ()>
238where
239 K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
240 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
241 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
242 T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
243 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
244{
245 create_generic_watcher::<K, K, T, _, _>(api, writer, reload_tx, controller_id, ctx, watcher)
246}
247
248pub fn create_metadata_watcher<K, T>(
249 api: Api<K>,
250 writer: Writer<PartialObjectMeta<K>>,
251 reload_tx: mpsc::Sender<()>,
252 controller_id: ControllerId,
253 ctx: Arc<Context<T>>,
254) -> BoxFuture<'static, ()>
255where
256 K: Resource + Lookup + Clone + DeserializeOwned + Send + Sync + Debug + 'static,
257 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone + Send + Sync,
258 <K as Resource>::DynamicType: Default + Eq + std::hash::Hash + Clone,
259 T: Resource<DynamicType = ()> + ResourceExt + Lookup + Clone + 'static,
260 <T as Lookup>::DynamicType: Eq + std::hash::Hash + Clone + Send + Sync,
261{
262 create_generic_watcher::<K, PartialObjectMeta<K>, T, _, _>(
263 api,
264 writer,
265 reload_tx,
266 controller_id,
267 ctx,
268 metadata_watcher,
269 )
270}
271
272pub fn error_policy<K>(_obj: Arc<K>, _error: &Error, _ctx: Arc<Context<K>>) -> Action
273where
274 K: Resource + Lookup + Clone + 'static,
275 <K as Lookup>::DynamicType: Default + Eq + std::hash::Hash + Clone,
276{
277 unreachable!("Handle in backoff_reconciler macro")
278}
279
280#[macro_export]
281macro_rules! backoff_reconciler {
282 ($inner_reconciler:ident) => {
283 |obj, ctx| async move {
284 use $crate::controller::context::BackoffContext;
285 match $inner_reconciler(obj.clone(), ctx.clone()).await {
286 Ok(action) => {
287 ctx.reset_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
288 .await;
289 Ok(action)
290 }
291 Err(error) => {
292 let namespace = kube::ResourceExt::namespace(obj.as_ref()).unwrap();
294 let name = kube::ResourceExt::name_any(obj.as_ref());
295 tracing::error!(msg = "failed reconciliation", %namespace, %name, %error);
296 ctx.metrics().reconcile_failure_inc();
297 let backoff_duration = ctx
298 .get_backoff(kube::runtime::reflector::ObjectRef::from(obj.as_ref()))
299 .await;
300 tracing::trace!(
301 msg = format!("backoff duration: {backoff_duration:?}"),
302 %namespace,
303 %name,
304 );
305 Ok(kube::runtime::controller::Action::requeue(backoff_duration))
306 }
307 }
308 }
309 };
310}