simple_kube_controller/
lib.rs

1use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc, time::Duration};
2
3use futures::StreamExt;
4use kube::{
5    api::{ObjectMeta, PartialObjectMetaExt, Patch, PatchParams},
6    runtime::{controller::Action, reflector::ObjectRef},
7    Api, Resource,
8};
9use serde::de::DeserializeOwned;
10use tokio::{
11    spawn,
12    sync::oneshot::Sender,
13    task::{JoinError, JoinHandle},
14};
15use tracing::{debug, error, instrument};
16
17/// DAG-oriented reconciler.
18#[cfg(feature = "dag")]
19pub mod dag;
20
21macro_rules! record_resource_metadata {
22    ($meta:expr) => {{
23        let span = tracing::Span::current();
24        if let Some(name) = &$meta.name {
25            span.record("resource.name", name);
26        }
27        if let Some(ns) = &$meta.namespace {
28            span.record("resource.namespace", ns);
29        }
30    }};
31}
32pub(crate) use record_resource_metadata;
33
34pub type OnErrorFn<CONTEXT, ERROR, RESOURCE> =
35    dyn Fn(Arc<RESOURCE>, &ERROR, Arc<CONTEXT>) -> Action + Send + Sync;
36
37/// Error when the controller is stopped.
38#[derive(Debug, thiserror::Error)]
39pub enum StopError {
40    #[error("failed to wait for controller task: {0}")]
41    Join(
42        #[from]
43        #[source]
44        JoinError,
45    ),
46    #[error("failed to send stop signal")]
47    Send,
48}
49
50/// A configuration.
51pub struct Config<
52    CONTEXT: Send + Sync,
53    ERROR: std::error::Error + Send + Sync,
54    RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
55> {
56    /// The controller configuration.
57    pub controller: ControllerConfig<CONTEXT, ERROR, RESOURCE>,
58    /// The watcher configuration.
59    pub watcher: kube::runtime::watcher::Config,
60}
61
62impl<
63        CONTEXT: Send + Sync,
64        ERROR: std::error::Error + Send + Sync,
65        RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
66    > Default for Config<CONTEXT, ERROR, RESOURCE>
67{
68    fn default() -> Self {
69        Self {
70            controller: Default::default(),
71            watcher: Default::default(),
72        }
73    }
74}
75
76/// A controller configuration.
77pub struct ControllerConfig<
78    CONTEXT: Send + Sync,
79    ERROR: std::error::Error + Send + Sync,
80    RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
81> {
82    // If defined, add automatically finalizer if it is not present on the resource.
83    pub finalizer: Option<String>,
84    /// Action to do when an error occurred. By default, the event is requeued for 10s.
85    pub on_error: Action,
86    /// Function to execute when an error occurred.
87    pub on_error_fn: Option<Box<OnErrorFn<CONTEXT, ERROR, RESOURCE>>>,
88}
89
90impl<
91        CONTEXT: Send + Sync,
92        ERROR: std::error::Error + Send + Sync,
93        RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
94    > Default for ControllerConfig<CONTEXT, ERROR, RESOURCE>
95{
96    fn default() -> Self {
97        Self {
98            finalizer: None,
99            on_error: Action::requeue(Duration::from_secs(10)),
100            on_error_fn: None,
101        }
102    }
103}
104
105/// A reconciler.
106pub trait Reconciler<
107    CONTEXT: Send + Sync,
108    ERROR: std::error::Error + Send + Sync,
109    RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
110>: Send + Sync
111{
112    /// Handle a resource creation/update.
113    /// Arguments:
114    ///   - `res`: the resource
115    ///   - `ctx`: the context of the application
116    fn reconcile_creation_or_update(
117        &self,
118        res: Arc<RESOURCE>,
119        ctx: Arc<CONTEXT>,
120    ) -> impl Future<Output = Result<Action, ERROR>> + Send;
121
122    /// Handle a resource deletion.
123    /// Arguments:
124    ///   - `res`: the resource
125    ///   - `ctx`: the context of the application
126    fn reconcile_deletion(
127        &self,
128        res: Arc<RESOURCE>,
129        ctx: Arc<CONTEXT>,
130    ) -> impl Future<Output = Result<Action, ERROR>> + Send;
131}
132
133/// A controller.
134pub struct Controller<
135    RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
136> {
137    handle: JoinHandle<()>,
138    stop_tx: Sender<()>,
139    _res: PhantomData<RESOURCE>,
140}
141
142impl<
143        RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync + 'static,
144    > Controller<RESOURCE>
145{
146    /// Start a controller.
147    /// Arguments:
148    ///   - `api`: the API object
149    ///   - `rec`: the reconciler to use
150    ///   - `ctx`: the context to give to the reconciler
151    ///   - `cfg`: the configuration (see [`Config`])
152    ///
153    /// The controller is started in background.
154    #[instrument(
155        fields(
156            resource.api_version = %RESOURCE::api_version(&()),
157        ),
158        skip(api, rec, ctx, cfg)
159    )]
160    pub fn start<
161        CONTEXT: Send + Sync + 'static,
162        ERROR: std::error::Error + Send + Sync + 'static,
163        RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE> + 'static,
164    >(
165        api: Api<RESOURCE>,
166        rec: RECONCILER,
167        ctx: Arc<CONTEXT>,
168        cfg: Config<CONTEXT, ERROR, RESOURCE>,
169    ) -> Self {
170        let ctx = Arc::new(Context::new(rec, ctx, cfg.controller, api.clone()));
171        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel();
172        let ctrl = kube::runtime::Controller::new(api, cfg.watcher)
173            .graceful_shutdown_on(async move {
174                if let Err(err) = stop_rx.await {
175                    error!("failed to receive stop signal: {err}");
176                }
177                debug!("stop signal received");
178            })
179            .run(Self::reconcile, Self::on_error, ctx)
180            .for_each(Self::on_each);
181        debug!("controller started");
182        Self {
183            handle: spawn(ctrl),
184            stop_tx,
185            _res: PhantomData,
186        }
187    }
188
189    /// Stop the controller.
190    #[instrument(
191        fields(
192            resource.api_version = %RESOURCE::api_version(&()),
193        ),
194        skip(self),
195    )]
196    pub async fn stop(self) -> Result<(), StopError> {
197        debug!("stopping signal");
198        self.stop_tx.send(()).map_err(|_| StopError::Send)?;
199        debug!("waiting for graceful shutdown");
200        self.handle.await?;
201        debug!("controller stopped");
202        Ok(())
203    }
204
205    #[instrument(
206        parent = None,
207        fields(
208            resource.api_version = %RESOURCE::api_version(&()),
209            resource.name,
210            resource.namespace,
211        ),
212        skip(res)
213    )]
214    async fn on_each<ERROR: std::error::Error + Send + Sync + 'static>(
215        res: Result<
216            (ObjectRef<RESOURCE>, Action),
217            kube::runtime::controller::Error<ReconcileError<ERROR>, kube::runtime::watcher::Error>,
218        >,
219    ) {
220        match res {
221            Ok((res, _)) => {
222                let span = tracing::Span::current();
223                span.record("resource.name", res.name);
224                if let Some(ns) = &res.namespace {
225                    span.record("resource.namespace", ns);
226                }
227                debug!("resource reconciled");
228            }
229            Err(err) => error!("{err}"),
230        }
231    }
232
233    #[instrument(
234        parent = None,
235        fields(
236            resource.api_version = %RESOURCE::api_version(&()),
237            resource.name,
238            resource.namespace,
239        ),
240        skip(res, err, ctx)
241    )]
242    fn on_error<
243        CONTEXT: Send + Sync,
244        ERROR: std::error::Error + Send + Sync,
245        RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
246    >(
247        res: Arc<RESOURCE>,
248        err: &ReconcileError<ERROR>,
249        ctx: Arc<Context<CONTEXT, ERROR, RECONCILER, RESOURCE>>,
250    ) -> Action {
251        record_resource_metadata!(res.meta());
252        error!("{err}");
253        match err {
254            ReconcileError::App(err) => {
255                if let Some(on_error_fn) = &ctx.config.on_error_fn {
256                    on_error_fn(res, err, ctx.global.clone())
257                } else {
258                    ctx.config.on_error.clone()
259                }
260            }
261            _ => ctx.config.on_error.clone(),
262        }
263    }
264
265    #[instrument(
266        parent = None,
267        fields(
268            resource.api_version = %RESOURCE::api_version(&()),
269            resource.name,
270            resource.namespace,
271        ),
272        skip(res, ctx)
273    )]
274    async fn reconcile<
275        CONTEXT: Send + Sync,
276        ERROR: std::error::Error + Send + Sync,
277        RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
278    >(
279        res: Arc<RESOURCE>,
280        ctx: Arc<Context<CONTEXT, ERROR, RECONCILER, RESOURCE>>,
281    ) -> Result<Action, ReconcileError<ERROR>> {
282        let meta = res.meta();
283        record_resource_metadata!(meta);
284        if meta.deletion_timestamp.is_some() {
285            debug!("resource has been deleted");
286            ctx.reconciler
287                .reconcile_deletion(res, ctx.global.clone())
288                .await
289                .map_err(ReconcileError::App)
290        } else {
291            debug!("resource has been created or updated");
292            if let Some(finalizer) = &ctx.config.finalizer {
293                let mut finalizers = meta.finalizers.clone().unwrap_or_default();
294                if finalizers.contains(finalizer) {
295                    debug!("finalizers already contain `{finalizer}`");
296                    ctx.reconciler
297                        .reconcile_creation_or_update(res, ctx.global.clone())
298                        .await
299                        .map_err(ReconcileError::App)
300                } else {
301                    let name = meta.name.as_ref().ok_or(ReconcileError::ResourceUnnamed)?;
302                    finalizers.push(finalizer.clone());
303                    let meta = ObjectMeta {
304                        finalizers: Some(finalizers),
305                        name: Some(name.clone()),
306                        namespace: meta.namespace.clone(),
307                        ..Default::default()
308                    };
309                    let partial = meta.into_request_partial::<RESOURCE>();
310                    let params = PatchParams::apply(env!("CARGO_PKG_NAME"));
311                    let patch = Patch::Apply(&partial);
312                    debug!("adding finalizer `{finalizer}` on resource `{name}`");
313                    ctx.api
314                        .patch_metadata(name, &params, &patch)
315                        .await
316                        .map_err(ReconcileError::Kube)?;
317                    Ok(Action::requeue(Duration::from_secs(0)))
318                }
319            } else {
320                ctx.reconciler
321                    .reconcile_creation_or_update(res, ctx.global.clone())
322                    .await
323                    .map_err(ReconcileError::App)
324            }
325        }
326    }
327}
328
329#[derive(Debug, thiserror::Error)]
330#[error("{0}")]
331enum ReconcileError<ERROR: std::error::Error + Send + Sync> {
332    App(#[source] ERROR),
333    Kube(#[source] kube::Error),
334    #[error("resource is unnamed")]
335    ResourceUnnamed,
336}
337
338struct Context<
339    CONTEXT: Send + Sync,
340    ERROR: std::error::Error + Send + Sync,
341    RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
342    RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
343> {
344    api: Api<RESOURCE>,
345    config: ControllerConfig<CONTEXT, ERROR, RESOURCE>,
346    global: Arc<CONTEXT>,
347    reconciler: RECONCILER,
348}
349
350impl<
351        CONTEXT: Send + Sync,
352        ERROR: std::error::Error + Send + Sync,
353        RECONCILER: Reconciler<CONTEXT, ERROR, RESOURCE>,
354        RESOURCE: Clone + Debug + DeserializeOwned + Resource<DynamicType = ()> + Send + Sync,
355    > Context<CONTEXT, ERROR, RECONCILER, RESOURCE>
356{
357    fn new(
358        rec: RECONCILER,
359        ctx: Arc<CONTEXT>,
360        cfg: ControllerConfig<CONTEXT, ERROR, RESOURCE>,
361        api: Api<RESOURCE>,
362    ) -> Self {
363        Self {
364            api,
365            config: cfg,
366            global: ctx,
367            reconciler: rec,
368        }
369    }
370}