1use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc, time::Duration};
2
3use futures::StreamExt;
4use kube::{
5 api::{ObjectMeta, PartialObjectMetaExt, Patch, PatchParams},
6 runtime::{controller::Action, reflector::ObjectRef},
7 Api, Resource,
8};
9use serde::de::DeserializeOwned;
10use tokio::{
11 spawn,
12 sync::oneshot::Sender,
13 task::{JoinError, JoinHandle},
14};
15use tracing::{debug, error, instrument};
16
17#[cfg(feature = "dag")]
19pub mod dag;
20
21macro_rules! record_resource_metadata {
22 ($meta:expr) => {{
23 let span = tracing::Span::current();
24 if let Some(name) = &$meta.name {
25 span.record("resource.name", name);
26 }
27 if let Some(ns) = &$meta.namespace {
28 span.record("resource.namespace", ns);
29 }
30 }};
31}
32pub(crate) use record_resource_metadata;
33
34pub type OnErrorFn<CONTEXT, ERROR, RESOURCE> =
35 dyn Fn(Arc<RESOURCE>, &ERROR, Arc<CONTEXT>) -> Action + Send + Sync;
36
37#[derive(Debug, thiserror::Error)]
39pub enum StopError {
40 #[error("failed to wait for controller task: {0}")]
41 Join(
42 #[from]
43 #[source]
44 JoinError,
45 ),
46 #[error("failed to send stop signal")]
47 Send,
48}
49
50pub struct Config<
52 CONTEXT: Send + Sync,
53 ERROR: std::error::Error + Send + Sync,
54 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
55> {
56 pub controller: ControllerConfig<CONTEXT, ERROR, RESOURCE>,
58 pub watcher: kube::runtime::watcher::Config,
60}
61
62impl<
63 CONTEXT: Send + Sync,
64 ERROR: std::error::Error + Send + Sync,
65 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
66 > Default for Config<CONTEXT, ERROR, RESOURCE>
67{
68 fn default() -> Self {
69 Self {
70 controller: Default::default(),
71 watcher: Default::default(),
72 }
73 }
74}
75
76pub struct ControllerConfig<
78 CONTEXT: Send + Sync,
79 ERROR: std::error::Error + Send + Sync,
80 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
81> {
82 pub finalizer: Option<String>,
84 pub on_error: Action,
86 pub on_error_fn: Option<Box<OnErrorFn<CONTEXT, ERROR, RESOURCE>>>,
88}
89
90impl<
91 CONTEXT: Send + Sync,
92 ERROR: std::error::Error + Send + Sync,
93 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
94 > Default for ControllerConfig<CONTEXT, ERROR, RESOURCE>
95{
96 fn default() -> Self {
97 Self {
98 finalizer: None,
99 on_error: Action::requeue(Duration::from_secs(10)),
100 on_error_fn: None,
101 }
102 }
103}
104
105pub trait Reconciler<
107 CONTEXT: Send + Sync,
108 ERROR: std::error::Error + Send + Sync,
109 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
110>: Send + Sync
111{
112 fn reconcile_creation_or_update(
117 &self,
118 res: Arc<RESOURCE>,
119 ctx: Arc<CONTEXT>,
120 ) -> impl Future<Output = Result<Action, ERROR>> + Send;
121
122 fn reconcile_deletion(
127 &self,
128 res: Arc<RESOURCE>,
129 ctx: Arc<CONTEXT>,
130 ) -> impl Future<Output = Result<Action, ERROR>> + Send;
131}
132
133pub struct Controller<
135 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
136> {
137 handle: JoinHandle<()>,
138 stop_tx: Sender<()>,
139 _res: PhantomData<RESOURCE>,
140}
141
142impl<
143 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync + 'static,
144 > Controller<RESOURCE>
145{
146 #[instrument(
155 fields(
156 resource.api_version = %RESOURCE::api_version(&()),
157 ),
158 skip(api, rec, ctx, cfg)
159 )]
160 pub fn start<
161 CONTEXT: Send + Sync + 'static,
162 ERROR: std::error::Error + Send + Sync + 'static,
163 RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE> + 'static,
164 >(
165 api: Api<RESOURCE>,
166 rec: RECONCILER,
167 ctx: Arc<CONTEXT>,
168 cfg: Config<CONTEXT, ERROR, RESOURCE>,
169 ) -> Self {
170 let ctx = Arc::new(Context::new(rec, ctx, cfg.controller, api.clone()));
171 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
172 let ctrl = kube::runtime::Controller::new(api, cfg.watcher)
173 .graceful_shutdown_on(async move {
174 if let Err(err) = stop_rx.await {
175 error!("failed to receive stop signal: {err}");
176 }
177 debug!("stop signal received");
178 })
179 .run(Self::reconcile, Self::on_error, ctx)
180 .for_each(Self::on_each);
181 debug!("controller started");
182 Self {
183 handle: spawn(ctrl),
184 stop_tx,
185 _res: PhantomData,
186 }
187 }
188
189 #[instrument(
191 fields(
192 resource.api_version = %RESOURCE::api_version(&()),
193 ),
194 skip(self),
195 )]
196 pub async fn stop(self) -> Result<(), StopError> {
197 debug!("stopping signal");
198 self.stop_tx.send(()).map_err(|_| StopError::Send)?;
199 debug!("waiting for graceful shutdown");
200 self.handle.await?;
201 debug!("controller stopped");
202 Ok(())
203 }
204
205 #[instrument(
206 parent = None,
207 fields(
208 resource.api_version = %RESOURCE::api_version(&()),
209 resource.name,
210 resource.namespace,
211 ),
212 skip(res)
213 )]
214 async fn on_each<ERROR: std::error::Error + Send + Sync + 'static>(
215 res: Result<
216 (ObjectRef<RESOURCE>, Action),
217 kube::runtime::controller::Error<ReconcileError<ERROR>, kube::runtime::watcher::Error>,
218 >,
219 ) {
220 match res {
221 Ok((res, _)) => {
222 let span = tracing::Span::current();
223 span.record("resource.name", res.name);
224 if let Some(ns) = &res.namespace {
225 span.record("resource.namespace", ns);
226 }
227 debug!("resource reconciled");
228 }
229 Err(err) => error!("{err}"),
230 }
231 }
232
233 #[instrument(
234 parent = None,
235 fields(
236 resource.api_version = %RESOURCE::api_version(&()),
237 resource.name,
238 resource.namespace,
239 ),
240 skip(res, err, ctx)
241 )]
242 fn on_error<
243 CONTEXT: Send + Sync,
244 ERROR: std::error::Error + Send + Sync,
245 RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
246 >(
247 res: Arc<RESOURCE>,
248 err: &ReconcileError<ERROR>,
249 ctx: Arc<Context<CONTEXT, ERROR, RECONCILER, RESOURCE>>,
250 ) -> Action {
251 record_resource_metadata!(res.meta());
252 error!("{err}");
253 match err {
254 ReconcileError::App(err) => {
255 if let Some(on_error_fn) = &ctx.config.on_error_fn {
256 on_error_fn(res, err, ctx.global.clone())
257 } else {
258 ctx.config.on_error.clone()
259 }
260 }
261 _ => ctx.config.on_error.clone(),
262 }
263 }
264
265 #[instrument(
266 parent = None,
267 fields(
268 resource.api_version = %RESOURCE::api_version(&()),
269 resource.name,
270 resource.namespace,
271 ),
272 skip(res, ctx)
273 )]
274 async fn reconcile<
275 CONTEXT: Send + Sync,
276 ERROR: std::error::Error + Send + Sync,
277 RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
278 >(
279 res: Arc<RESOURCE>,
280 ctx: Arc<Context<CONTEXT, ERROR, RECONCILER, RESOURCE>>,
281 ) -> Result<Action, ReconcileError<ERROR>> {
282 let meta = res.meta();
283 record_resource_metadata!(meta);
284 if meta.deletion_timestamp.is_some() {
285 debug!("resource has been deleted");
286 ctx.reconciler
287 .reconcile_deletion(res, ctx.global.clone())
288 .await
289 .map_err(ReconcileError::App)
290 } else {
291 debug!("resource has been created or updated");
292 if let Some(finalizer) = &ctx.config.finalizer {
293 let mut finalizers = meta.finalizers.clone().unwrap_or_default();
294 if finalizers.contains(finalizer) {
295 debug!("finalizers already contain `{finalizer}`");
296 ctx.reconciler
297 .reconcile_creation_or_update(res, ctx.global.clone())
298 .await
299 .map_err(ReconcileError::App)
300 } else {
301 let name = meta.name.as_ref().ok_or(ReconcileError::ResourceUnnamed)?;
302 finalizers.push(finalizer.clone());
303 let meta = ObjectMeta {
304 finalizers: Some(finalizers),
305 name: Some(name.clone()),
306 namespace: meta.namespace.clone(),
307 ..Default::default()
308 };
309 let partial = meta.into_request_partial::<RESOURCE>();
310 let params = PatchParams::apply(env!("CARGO_PKG_NAME"));
311 let patch = Patch::Apply(&partial);
312 debug!("adding finalizer `{finalizer}` on resource `{name}`");
313 ctx.api
314 .patch_metadata(name, ¶ms, &patch)
315 .await
316 .map_err(ReconcileError::Kube)?;
317 Ok(Action::requeue(Duration::from_secs(0)))
318 }
319 } else {
320 ctx.reconciler
321 .reconcile_creation_or_update(res, ctx.global.clone())
322 .await
323 .map_err(ReconcileError::App)
324 }
325 }
326 }
327}
328
329#[derive(Debug, thiserror::Error)]
330#[error("{0}")]
331enum ReconcileError<ERROR: std::error::Error + Send + Sync> {
332 App(#[source] ERROR),
333 Kube(#[source] kube::Error),
334 #[error("resource is unnamed")]
335 ResourceUnnamed,
336}
337
338struct Context<
339 CONTEXT: Send + Sync,
340 ERROR: std::error::Error + Send + Sync,
341 RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
342 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
343> {
344 api: Api<RESOURCE>,
345 config: ControllerConfig<CONTEXT, ERROR, RESOURCE>,
346 global: Arc<CONTEXT>,
347 reconciler: RECONCILER,
348}
349
350impl<
351 CONTEXT: Send + Sync,
352 ERROR: std::error::Error + Send + Sync,
353 RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
354 RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
355 > Context<CONTEXT, ERROR, RECONCILER, RESOURCE>
356{
357 fn new(
358 rec: RECONCILER,
359 ctx: Arc<CONTEXT>,
360 cfg: ControllerConfig<CONTEXT, ERROR, RESOURCE>,
361 api: Api<RESOURCE>,
362 ) -> Self {
363 Self {
364 api,
365 config: cfg,
366 global: ctx,
367 reconciler: rec,
368 }
369 }
370}