controller/
controller.rs

1use crate::{telemetry, Error, Metrics, Result};
2use chrono::{DateTime, Utc};
3use futures::{
4    future::{BoxFuture, FutureExt},
5    stream::StreamExt,
6};
7
8use crate::{
9    config::Config,
10    cronjob::reconcile_cronjob,
11    exec::{ExecCommand, ExecOutput},
12    psql::{PsqlCommand, PsqlOutput},
13    rbac::reconcile_rbac,
14    service::reconcile_svc,
15    statefulset::{reconcile_sts, stateful_set_from_cdb},
16};
17use kube::{
18    api::{Api, ListParams, Patch, PatchParams, ResourceExt},
19    client::Client,
20    runtime::{
21        controller::{Action, Controller},
22        events::{Event, EventType, Recorder, Reporter},
23        finalizer::{finalizer, Event as Finalizer},
24    },
25    Resource,
26};
27
28use crate::{
29    apis::coredb_types::{CoreDB, CoreDBStatus},
30    extensions::{reconcile_extensions, Extension},
31    postgres_exporter::{create_postgres_exporter_role, reconcile_prom_configmap},
32    secret::reconcile_secret,
33};
34use k8s_openapi::api::core::v1::{Namespace, Pod};
35use kube::runtime::wait::Condition;
36use serde::Serialize;
37use serde_json::json;
38use std::sync::Arc;
39use tokio::{sync::RwLock, time::Duration};
40use tracing::*;
41
42pub static COREDB_FINALIZER: &str = "coredbs.coredb.io";
43pub static COREDB_ANNOTATION: &str = "coredbs.coredb.io/watch";
44
45// Context for our reconciler
46#[derive(Clone)]
47pub struct Context {
48    /// Kubernetes client
49    pub client: Client,
50    /// Diagnostics read by the web server
51    pub diagnostics: Arc<RwLock<Diagnostics>>,
52    /// Prometheus metrics
53    pub metrics: Metrics,
54}
55
56#[instrument(skip(ctx, cdb), fields(trace_id))]
57async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
58    let cfg = Config::default();
59    let trace_id = telemetry::get_trace_id();
60    Span::current().record("trace_id", &field::display(&trace_id));
61    let _timer = ctx.metrics.count_and_measure();
62    ctx.diagnostics.write().await.last_event = Utc::now();
63    let ns = cdb.namespace().unwrap(); // cdb is namespace scoped
64    let coredbs: Api<CoreDB> = Api::namespaced(ctx.client.clone(), &ns);
65    // Get metadata for the CoreDB object
66    let metadata = cdb.meta().clone();
67    // Get annotations from the metadata
68    let annotations = metadata.annotations.clone().unwrap_or_default();
69
70    // Check the annotations to see if it exists and check it's value
71    if let Some(value) = annotations.get(COREDB_ANNOTATION) {
72        // If the value is false, then we should skip reconciling
73        if value == "false" {
74            info!(
75                "Skipping reconciliation for CoreDB \"{}\" in {}",
76                cdb.name_any(),
77                ns
78            );
79            return Ok(Action::await_change());
80        }
81    }
82
83    info!("Reconciling CoreDB \"{}\" in {}", cdb.name_any(), ns);
84    finalizer(&coredbs, COREDB_FINALIZER, cdb, |event| async {
85        match event {
86            Finalizer::Apply(cdb) => match cdb.reconcile(ctx.clone(), &cfg).await {
87                Ok(action) => Ok(action),
88                Err(requeue_action) => Ok(requeue_action),
89            },
90            Finalizer::Cleanup(cdb) => cdb.cleanup(ctx.clone()).await,
91        }
92    })
93    .await
94    .map_err(|e| Error::FinalizerError(Box::new(e)))
95}
96
97fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
98    warn!("reconcile failed: {:?}", error);
99    ctx.metrics.reconcile_failure(&cdb, error);
100    Action::requeue(Duration::from_secs(5 * 60))
101}
102
103impl CoreDB {
104    // Reconcile (for non-finalizer related changes)
105    async fn reconcile(&self, ctx: Arc<Context>, cfg: &Config) -> Result<Action, Action> {
106        let client = ctx.client.clone();
107        let _recorder = ctx.diagnostics.read().await.recorder(client.clone(), self);
108        let ns = self.namespace().unwrap();
109        let name = self.name_any();
110        let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), &ns);
111
112        // create/update configmap when postgres exporter enabled
113        if self.spec.postgresExporterEnabled {
114            reconcile_prom_configmap(self, client.clone(), &ns)
115                .await
116                .map_err(|e| {
117                    error!("Error reconciling prometheus configmap: {:?}", e);
118                    Action::requeue(Duration::from_secs(10))
119                })?;
120        }
121
122        // reconcile service account, role, and role binding
123        reconcile_rbac(self, ctx.clone()).await.map_err(|e| {
124            error!("Error reconciling service account: {:?}", e);
125            Action::requeue(Duration::from_secs(300))
126        })?;
127
128        // reconcile secret
129        reconcile_secret(self, ctx.clone()).await.map_err(|e| {
130            error!("Error reconciling secret: {:?}", e);
131            Action::requeue(Duration::from_secs(10))
132        })?;
133
134        // reconcile cronjob for backups
135        reconcile_cronjob(self, ctx.clone()).await.map_err(|e| {
136            error!("Error reconciling cronjob: {:?}", e);
137            Action::requeue(Duration::from_secs(10))
138        })?;
139
140        // reconcile statefulset
141        reconcile_sts(self, ctx.clone()).await.map_err(|e| {
142            error!("Error reconciling statefulset: {:?}", e);
143            Action::requeue(Duration::from_secs(10))
144        })?;
145
146        // reconcile service
147        reconcile_svc(self, ctx.clone()).await.map_err(|e| {
148            error!("Error reconciling service: {:?}", e);
149            Action::requeue(Duration::from_secs(10))
150        })?;
151
152        let new_status = match self.spec.stop {
153            false => {
154                let primary_pod = self.primary_pod(ctx.client.clone()).await;
155                if primary_pod.is_err() {
156                    debug!("Did not find primary pod");
157                    return Ok(Action::requeue(Duration::from_secs(1)));
158                }
159                let primary_pod = primary_pod.unwrap();
160
161                if !is_postgres_ready().matches_object(Some(&primary_pod)) {
162                    debug!("Postgres is not ready");
163                    return Ok(Action::requeue(Duration::from_secs(1)));
164                }
165                // creating exporter role is pre-requisite to the postgres pod becoming "ready"
166                create_postgres_exporter_role(self, ctx.clone())
167                    .await
168                    .map_err(|e| {
169                        error!(
170                            "Error creating postgres_exporter on CoreDB {}, {}",
171                            self.metadata.name.clone().unwrap(),
172                            e
173                        );
174                        Action::requeue(Duration::from_secs(5))
175                    })?;
176
177                if !is_pod_ready().matches_object(Some(&primary_pod)) {
178                    debug!("Did not find primary pod");
179                    return Ok(Action::requeue(Duration::from_secs(1)));
180                }
181
182                let extensions: Vec<Extension> = reconcile_extensions(self, ctx.clone(), &coredbs, &name)
183                    .await
184                    .map_err(|e| {
185                        error!("Error reconciling extensions: {:?}", e);
186                        Action::requeue(Duration::from_secs(10))
187                    })?;
188
189                // Check cfg.enable_initial_backup to make sure we should run the initial backup
190                // if it's true, run the backup
191                if cfg.enable_initial_backup {
192                    let backup_command = vec![
193                        "/bin/sh".to_string(),
194                        "-c".to_string(),
195                        "/usr/bin/wal-g backup-push /var/lib/postgresql/data --full --verify".to_string(),
196                    ];
197
198                    let _backup_result = self
199                        .exec(primary_pod.name_any(), client, &backup_command)
200                        .await
201                        .map_err(|e| {
202                            error!("Error running backup: {:?}", e);
203                            Action::requeue(Duration::from_secs(10))
204                        })?;
205                }
206
207                CoreDBStatus {
208                    running: true,
209                    extensionsUpdating: false,
210                    storage: self.spec.storage.clone(),
211                    sharedirStorage: self.spec.sharedirStorage.clone(),
212                    pkglibdirStorage: self.spec.pkglibdirStorage.clone(),
213                    extensions: Some(extensions),
214                }
215            }
216            true => CoreDBStatus {
217                running: false,
218                extensionsUpdating: false,
219                storage: self.spec.storage.clone(),
220                sharedirStorage: self.spec.sharedirStorage.clone(),
221                pkglibdirStorage: self.spec.pkglibdirStorage.clone(),
222                extensions: self.status.clone().and_then(|f| f.extensions),
223            },
224        };
225
226        let patch_status = json!({
227            "apiVersion": "coredb.io/v1alpha1",
228            "kind": "CoreDB",
229            "status": new_status
230        });
231
232        patch_cdb_status_force(&coredbs, &name, patch_status).await?;
233
234        // If no events were received, check back every minute
235        Ok(Action::requeue(Duration::from_secs(60)))
236    }
237
238    // Finalizer cleanup (the object was deleted, ensure nothing is orphaned)
239    async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
240        // If namespace is terminating, do not publish delete event. Attempting to publish an event
241        // in a terminating namespace will leave us in a bad state in which the namespace will hang
242        // in terminating state.
243        let ns_api: Api<Namespace> = Api::all(ctx.client.clone());
244        let ns_status = ns_api
245            .get_status(self.metadata.namespace.as_ref().unwrap())
246            .await
247            .map_err(Error::KubeError);
248        let phase = ns_status.unwrap().status.unwrap().phase;
249        if phase == Some("Terminating".to_string()) {
250            return Ok(Action::await_change());
251        }
252        let recorder = ctx.diagnostics.read().await.recorder(ctx.client.clone(), self);
253        // CoreDB doesn't have dependencies in this example case, so we just publish an event
254        recorder
255            .publish(Event {
256                type_: EventType::Normal,
257                reason: "DeleteCoreDB".into(),
258                note: Some(format!("Delete `{}`", self.name_any())),
259                action: "Reconciling".into(),
260                secondary: None,
261            })
262            .await
263            .map_err(Error::KubeError)?;
264        Ok(Action::await_change())
265    }
266
267    pub async fn primary_pod(&self, client: Client) -> Result<Pod, Error> {
268        let sts = stateful_set_from_cdb(self);
269        let sts_name = sts.metadata.name.unwrap();
270        let sts_namespace = sts.metadata.namespace.unwrap();
271        let label_selector = format!("statefulset={sts_name}");
272        let list_params = ListParams::default().labels(&label_selector);
273        let pods: Api<Pod> = Api::namespaced(client, &sts_namespace);
274        let pods = pods.list(&list_params);
275        // Return an error if the query fails
276        let pod_list = pods.await.map_err(Error::KubeError)?;
277        // Return an error if the list is empty
278        if pod_list.items.is_empty() {
279            return Err(Error::KubeError(kube::Error::Api(kube::error::ErrorResponse {
280                status: "404".to_string(),
281                message: "No pods found".to_string(),
282                reason: "Not Found".to_string(),
283                code: 404,
284            })));
285        }
286        let primary = pod_list.items[0].clone();
287        Ok(primary)
288    }
289
290    pub async fn psql(
291        &self,
292        command: String,
293        database: String,
294        client: Client,
295    ) -> Result<PsqlOutput, kube::Error> {
296        let pod_name = self
297            .primary_pod(client.clone())
298            .await
299            .unwrap()
300            .metadata
301            .name
302            .unwrap();
303
304        PsqlCommand::new(
305            pod_name,
306            self.metadata.namespace.clone().unwrap(),
307            command,
308            database,
309            client,
310        )
311        .execute()
312        .await
313    }
314
315    pub async fn exec(
316        &self,
317        pod_name: String,
318        client: Client,
319        command: &[String],
320    ) -> Result<ExecOutput, Error> {
321        ExecCommand::new(pod_name, self.metadata.namespace.clone().unwrap(), client)
322            .execute(command)
323            .await
324    }
325}
326
327pub fn is_pod_ready() -> impl Condition<Pod> + 'static {
328    move |obj: Option<&Pod>| {
329        if let Some(pod) = &obj {
330            if let Some(status) = &pod.status {
331                if let Some(conds) = &status.conditions {
332                    if let Some(pcond) = conds.iter().find(|c| c.type_ == "ContainersReady") {
333                        return pcond.status == "True";
334                    }
335                }
336            }
337        }
338        false
339    }
340}
341
342pub fn is_postgres_ready() -> impl Condition<Pod> + 'static {
343    move |obj: Option<&Pod>| {
344        if let Some(pod) = &obj {
345            if let Some(status) = &pod.status {
346                if let Some(container_statuses) = &status.container_statuses {
347                    for container in container_statuses {
348                        if container.name == "postgres" {
349                            return container.ready;
350                        }
351                    }
352                }
353            }
354        }
355        false
356    }
357}
358
359pub async fn patch_cdb_status_force(
360    cdb: &Api<CoreDB>,
361    name: &str,
362    patch: serde_json::Value,
363) -> Result<(), Action> {
364    let ps = PatchParams::apply("cntrlr").force();
365    let patch_status = Patch::Apply(patch);
366    let _o = cdb.patch_status(name, &ps, &patch_status).await.map_err(|e| {
367        error!("Error updating CoreDB status: {:?}", e);
368        Action::requeue(Duration::from_secs(10))
369    })?;
370    Ok(())
371}
372
373pub async fn patch_cdb_status_merge(
374    cdb: &Api<CoreDB>,
375    name: &str,
376    patch: serde_json::Value,
377) -> Result<(), Action> {
378    let pp = PatchParams::default();
379    let patch_status = Patch::Merge(patch);
380    let _o = cdb.patch_status(name, &pp, &patch_status).await.map_err(|e| {
381        error!("Error updating CoreDB status: {:?}", e);
382        Action::requeue(Duration::from_secs(10))
383    })?;
384    Ok(())
385}
386
387/// Diagnostics to be exposed by the web server
388#[derive(Clone, Serialize)]
389pub struct Diagnostics {
390    #[serde(deserialize_with = "from_ts")]
391    pub last_event: DateTime<Utc>,
392    #[serde(skip)]
393    pub reporter: Reporter,
394}
395impl Default for Diagnostics {
396    fn default() -> Self {
397        Self {
398            last_event: Utc::now(),
399            reporter: "coredb-controller".into(),
400        }
401    }
402}
403impl Diagnostics {
404    fn recorder(&self, client: Client, cdb: &CoreDB) -> Recorder {
405        Recorder::new(client, self.reporter.clone(), cdb.object_ref(&()))
406    }
407}
408
409/// State shared between the controller and the web server
410#[derive(Clone, Default)]
411pub struct State {
412    /// Diagnostics populated by the reconciler
413    diagnostics: Arc<RwLock<Diagnostics>>,
414    /// Metrics registry
415    registry: prometheus::Registry,
416}
417
418/// State wrapper around the controller outputs for the web server
419impl State {
420    /// Metrics getter
421    pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
422        self.registry.gather()
423    }
424
425    /// State getter
426    pub async fn diagnostics(&self) -> Diagnostics {
427        self.diagnostics.read().await.clone()
428    }
429
430    // Create a Controller Context that can update State
431    pub fn create_context(&self, client: Client) -> Arc<Context> {
432        Arc::new(Context {
433            client,
434            metrics: Metrics::default().register(&self.registry).unwrap(),
435            diagnostics: self.diagnostics.clone(),
436        })
437    }
438}
439
440/// Initialize the controller and shared state (given the crd is installed)
441pub async fn init(client: Client) -> (BoxFuture<'static, ()>, State) {
442    let state = State::default();
443    let cdb = Api::<CoreDB>::all(client.clone());
444    if let Err(e) = cdb.list(&ListParams::default().limit(1)).await {
445        error!("CRD is not queryable; {e:?}. Is the CRD installed?");
446        info!("Installation: cargo run --bin crdgen | kubectl apply -f -");
447        std::process::exit(1);
448    }
449    let controller = Controller::new(cdb, ListParams::default())
450        .shutdown_on_signal()
451        .run(reconcile, error_policy, state.create_context(client))
452        .filter_map(|x| async move { Result::ok(x) })
453        .for_each(|_| futures::future::ready(()))
454        .boxed();
455    (controller, state)
456}
457
458// Tests rely on fixtures.rs
459#[cfg(test)]
460mod test {
461    use super::{reconcile, Context, CoreDB};
462    use std::sync::Arc;
463
464    #[tokio::test]
465    async fn new_coredbs_without_finalizers_gets_a_finalizer() {
466        let (testctx, fakeserver, _) = Context::test();
467        let coredb = CoreDB::test();
468        // verify that coredb gets a finalizer attached during reconcile
469        fakeserver.handle_finalizer_creation(&coredb);
470        let res = reconcile(Arc::new(coredb), testctx).await;
471        assert!(res.is_ok(), "initial creation succeeds in adding finalizer");
472    }
473
474    #[tokio::test]
475    async fn test_patches_coredb() {
476        let (testctx, fakeserver, _) = Context::test();
477        let coredb = CoreDB::test().finalized();
478        fakeserver.handle_coredb_patch(&coredb);
479        let res = reconcile(Arc::new(coredb), testctx).await;
480        assert!(res.is_ok(), "finalized coredb succeeds in its reconciler");
481    }
482}