controller/
controller.rs

1use chrono::{DateTime, Utc};
2use futures::stream::StreamExt;
3
4use crate::{
5    apis::coredb_types::{CoreDB, CoreDBStatus, VolumeSnapshot},
6    app_service::manager::reconcile_app_services,
7    cloudnativepg::{
8        archive::wal::reconcile_last_archive_status,
9        backups::Backup,
10        cnpg::{
11            cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup,
12            reconcile_pooler,
13        },
14        placement::cnpg_placement::PlacementConfig,
15        retention::snapshots::cleanup_old_volume_snapshots,
16        VOLUME_SNAPSHOT_CLASS_NAME,
17    },
18    config::Config,
19    dedicated_networking::reconcile_dedicated_networking,
20    exec::{ExecCommand, ExecOutput},
21    extensions::database_queries::is_not_restarting,
22    heartbeat::reconcile_heartbeat,
23    ingress::reconcile_postgres_ing_route_tcp,
24    postgres_certificates::reconcile_certificates,
25    psql::{PsqlCommand, PsqlOutput},
26    secret::{reconcile_postgres_role_secret, reconcile_secret},
27    telemetry, Error, Metrics, Result,
28};
29use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::util::intstr::IntOrString};
30use kube::{
31    api::{Api, ListParams, Patch, PatchParams, ResourceExt},
32    client::Client,
33    runtime::{
34        controller::{Action, Controller},
35        events::{Event, EventType, Recorder, Reporter},
36        finalizer::{finalizer, Event as Finalizer},
37        wait::Condition,
38        watcher::Config as watcherConfig,
39    },
40    Resource,
41};
42
43use crate::cloudnativepg::hibernate::reconcile_cluster_hibernation;
44use crate::{
45    apis::postgres_parameters::PgConfig,
46    configmap::reconcile_generic_metrics_configmap,
47    extensions::{database_queries::list_config_params, reconcile_extensions},
48    ingress::{reconcile_extra_postgres_ing_route_tcp, reconcile_ip_allowlist_middleware},
49    network_policies::reconcile_network_policies,
50    postgres_exporter::reconcile_metrics_configmap,
51    trunk::{extensions_that_require_load, reconcile_trunk_configmap},
52};
53use k8s_openapi::api::core::v1::Secret;
54use serde::Serialize;
55use serde_json::json;
56use std::sync::Arc;
57use tokio::{sync::RwLock, time::Duration};
58use tracing::*;
59
60pub static COREDB_FINALIZER: &str = "coredbs.coredb.io";
61pub static COREDB_ANNOTATION: &str = "coredbs.coredb.io/watch";
62
63// Context for our reconciler
64#[derive(Clone)]
65pub struct Context {
66    /// Kubernetes client
67    pub client: Client,
68    /// Event recorder
69    pub recorder: Recorder,
70    /// Diagnostics read by the web server
71    pub diagnostics: Arc<RwLock<Diagnostics>>,
72    /// Prometheus metrics
73    pub metrics: Arc<Metrics>,
74}
75
76pub fn requeue_normal_with_jitter() -> Action {
77    let cfg = Config::default();
78    // Check back every 90-150 seconds
79    let jitter = rand::Rng::random_range(&mut rand::rng(), 0..60);
80    Action::requeue(Duration::from_secs(cfg.reconcile_ttl + jitter))
81}
82
83#[instrument(skip(ctx, cdb), fields(trace_id))]
84async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
85    let trace_id = telemetry::get_trace_id();
86    Span::current().record("trace_id", field::display(&trace_id));
87    let cfg = Config::default();
88    let _timer = ctx.metrics.reconcile.count_and_measure(&trace_id);
89    ctx.diagnostics.write().await.last_event = Utc::now();
90    let ns = cdb.namespace().unwrap(); // cdb is namespace scoped
91    let coredbs: Api<CoreDB> = Api::namespaced(ctx.client.clone(), &ns);
92    // Get metadata for the CoreDB object
93    let metadata = cdb.meta();
94    // Get annotations from the metadata
95    let annotations = metadata.annotations.clone().unwrap_or_default();
96
97    // Check the annotations to see if it exists and check it's value
98    if let Some(value) = annotations.get(COREDB_ANNOTATION) {
99        // If the value is false, then we should skip reconciling
100        if value == "false" {
101            info!(
102                "Skipping reconciliation for CoreDB \"{}\" in {}",
103                cdb.name_any(),
104                ns
105            );
106            return Ok(Action::await_change());
107        }
108    }
109
110    debug!("Reconciling CoreDB \"{}\" in {}", cdb.name_any(), ns);
111    finalizer(&coredbs, COREDB_FINALIZER, cdb, |event| async {
112        match event {
113            Finalizer::Apply(cdb) => match cdb.reconcile(ctx.clone(), &cfg).await {
114                Ok(action) => Ok(action),
115                Err(requeue_action) => Ok(requeue_action),
116            },
117            Finalizer::Cleanup(cdb) => cdb.cleanup(ctx.clone()).await,
118        }
119    })
120    .await
121    .map_err(|e| Error::FinalizerError(Box::new(e)))
122}
123
124pub(crate) fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
125    warn!("reconcile failed: {:?}", error);
126    ctx.metrics.reconcile.set_failure(&cdb, error);
127
128    // Check for 429 error code from Kubernetes API
129    match error {
130        Error::KubeError(kube_error) => match kube_error {
131            kube::Error::Api(api_error) if api_error.code == 429 => {
132                // Error is a 429 (too many requests), calculate backoff and jitter
133                let backoff: u64 = 60;
134                let max_jitter: u64 = 120;
135                let jitter = rand::Rng::random_range(&mut rand::rng(), 0..=max_jitter);
136                let backoff_with_jitter = Duration::from_secs(backoff + jitter);
137                // Log the 429 error and the calculated backoff time
138                warn!(
139                    "Received HTTP 429 Too Many Requests. Requeuing after {} seconds.",
140                    backoff_with_jitter.as_secs()
141                );
142                Action::requeue(backoff_with_jitter)
143            }
144            _ => Action::requeue(Duration::from_secs(5 * 60)),
145        },
146        _ => Action::requeue(Duration::from_secs(5 * 60)),
147    }
148}
149
150// create_volume_snapshot_patch creates a patch for the CoreDB spec to enable or disable volumesnapshots
151// based off the value of cfg.enable_volume_snapshot.
152fn create_volume_snapshot_patch(cfg: &Config) -> serde_json::Value {
153    json!({
154        "spec": {
155            "backup": {
156                "volumeSnapshot": {
157                    "enabled": cfg.enable_volume_snapshot,
158                    "snapshotClass": if cfg.enable_volume_snapshot {
159                        Some(VOLUME_SNAPSHOT_CLASS_NAME.to_string())
160                    } else {
161                        None
162                    }
163                }
164            }
165        }
166    })
167}
168
169// is_volume_snapshot_update_needed checks if the volume snapshot needs to be updated in the CoreDB spec.
170fn is_volume_snapshot_update_needed(
171    volume_snapshot: Option<&VolumeSnapshot>,
172    enable_volume_snapshot: bool,
173) -> bool {
174    let current_enabled = volume_snapshot.map(|vs| vs.enabled).unwrap_or(false);
175    current_enabled != enable_volume_snapshot
176}
177
178impl CoreDB {
179    // Reconcile (for non-finalizer related changes)
180    #[instrument(skip(self, ctx, cfg))]
181    async fn reconcile(&self, ctx: Arc<Context>, cfg: &Config) -> Result<Action, Action> {
182        let client = ctx.client.clone();
183        let ns = self.namespace().unwrap();
184        let name = self.name_any();
185        let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), &ns);
186
187        // If the cluster is stopped, apply hibernation and exit
188        reconcile_cluster_hibernation(self, &ctx).await?;
189
190        // Setup Node/Pod Placement Configuration for the Pooler and App Service deployments
191        let placement_config = PlacementConfig::new(self);
192
193        reconcile_network_policies(ctx.client.clone(), &ns).await?;
194
195        // Fetch any metadata we need from Trunk
196        reconcile_trunk_configmap(ctx.client.clone(), &ns).await?;
197
198        reconcile_certificates(ctx.client.clone(), self, &ns).await?;
199
200        // Check if we need to delete the IngressRouteTCP and MiddlewareTCP resources
201        let delete = self.spec.replicas < 1 || self.spec.stop || self.spec.disable_ingress;
202
203        // Ingress
204        match std::env::var("DATA_PLANE_BASEDOMAIN") {
205            Ok(basedomain) => {
206                debug!(
207                    "DATA_PLANE_BASEDOMAIN is set to {}, reconciling IngressRouteTCP and MiddlewareTCP for {}",
208                    basedomain, name.clone()
209                );
210
211                let middleware_name = reconcile_ip_allowlist_middleware(self, ctx.clone())
212                    .await
213                    .map_err(|e| {
214                        error!("Error reconciling MiddlewareTCP for {}: {:?}", name, e);
215                        Action::requeue(Duration::from_secs(300))
216                    })?;
217
218                let service_name_read_only = format!("{}-ro", self.name_any().as_str());
219                let prefix_read_only = format!("{}-ro-", self.name_any().as_str());
220                let read_only_subdomain = format!("{}-ro", self.name_any().as_str());
221                reconcile_postgres_ing_route_tcp(
222                    self,
223                    ctx.clone(),
224                    &read_only_subdomain,
225                    basedomain.as_str(),
226                    ns.as_str(),
227                    prefix_read_only.as_str(),
228                    service_name_read_only.as_str(),
229                    IntOrString::Int(5432),
230                    vec![middleware_name.clone()],
231                    delete,
232                )
233                .await
234                .map_err(|e| {
235                    error!("Error reconciling postgres ingress route: {:?}", e);
236                    // For unexpected errors, we should requeue for several minutes at least,
237                    // for expected, "waiting" type of requeuing, those should be shorter, just a few seconds.
238                    // IngressRouteTCP does not have expected errors during reconciliation.
239                    Action::requeue(Duration::from_secs(300))
240                })?;
241
242                let service_name_read_write = format!("{}-rw", self.name_any().as_str());
243                let prefix_read_write = format!("{}-rw-", self.name_any().as_str());
244                reconcile_postgres_ing_route_tcp(
245                    self,
246                    ctx.clone(),
247                    self.name_any().as_str(),
248                    basedomain.as_str(),
249                    ns.as_str(),
250                    prefix_read_write.as_str(),
251                    service_name_read_write.as_str(),
252                    IntOrString::Int(5432),
253                    vec![middleware_name.clone()],
254                    delete,
255                )
256                .await
257                .map_err(|e| {
258                    error!("Error reconciling postgres ingress route: {:?}", e);
259                    // For unexpected errors, we should requeue for several minutes at least,
260                    // for expected, "waiting" type of requeuing, those should be shorter, just a few seconds.
261                    // IngressRouteTCP does not have expected errors during reconciliation.
262                    Action::requeue(Duration::from_secs(300))
263                })?;
264
265                reconcile_extra_postgres_ing_route_tcp(
266                    self,
267                    ctx.clone(),
268                    ns.as_str(),
269                    service_name_read_write.as_str(),
270                    IntOrString::Int(5432),
271                    vec![middleware_name.clone()],
272                )
273                .await
274                .map_err(|e| {
275                    error!("Error reconciling extra postgres ingress route: {:?}", e);
276                    // For unexpected errors, we should requeue for several minutes at least,
277                    // for expected, "waiting" type of requeuing, those should be shorter, just a few seconds.
278                    // IngressRouteTCP does not have expected errors during reconciliation.
279                    Action::requeue(Duration::from_secs(300))
280                })?;
281
282                reconcile_dedicated_networking(self, ctx.clone(), basedomain.as_str())
283                    .await
284                    .map_err(|e| {
285                        error!("Error reconciling dedicated networking: {:?}", e);
286                        Action::requeue(Duration::from_secs(300))
287                    })?;
288
289                let name_pooler = format!("{}-pooler", self.name_any().as_str());
290                let prefix_pooler = format!("{}-pooler-", self.name_any().as_str());
291                let delete_pooler = delete || !self.spec.connectionPooler.enabled;
292                reconcile_postgres_ing_route_tcp(
293                    self,
294                    ctx.clone(),
295                    name_pooler.as_str(),
296                    basedomain.as_str(),
297                    ns.as_str(),
298                    prefix_pooler.as_str(),
299                    name_pooler.as_str(),
300                    IntOrString::Int(5432),
301                    vec![middleware_name.clone()],
302                    delete_pooler,
303                )
304                .await
305                .map_err(|e| {
306                    error!("Error reconciling pooler ingress route: {:?}", e);
307                    // For unexpected errors, we should requeue for several minutes at least,
308                    // for expected, "waiting" type of requeuing, those should be shorter, just a few seconds.
309                    // IngressRouteTCP does not have expected errors during reconciliation.
310                    Action::requeue(Duration::from_secs(300))
311                })?;
312            }
313            Err(_e) => {
314                warn!(
315                    "DATA_PLANE_BASEDOMAIN is not set, skipping reconciliation of IngressRouteTCP"
316                );
317            }
318        };
319
320        debug!("Reconciling secret");
321        // Superuser connection info
322        reconcile_secret(self, ctx.clone()).await?;
323        reconcile_app_services(self, ctx.clone(), placement_config.clone()).await?;
324
325        if self
326            .spec
327            .metrics
328            .as_ref()
329            .and_then(|m| m.queries.as_ref())
330            .is_some()
331        {
332            debug!("Reconciling prometheus configmap");
333            reconcile_metrics_configmap(self, client.clone(), &ns)
334                .await
335                .map_err(|e| {
336                    error!("Error reconciling prometheus configmap: {:?}", e);
337                    Action::requeue(Duration::from_secs(300))
338                })?;
339        }
340
341        let _ = reconcile_postgres_role_secret(
342            self,
343            ctx.clone(),
344            "readonly",
345            &format!("{}-ro", name.clone()),
346        )
347        .await
348        .map_err(|e| {
349            error!("Error reconciling postgres exporter secret: {:?}", e);
350            Action::requeue(Duration::from_secs(300))
351        })?;
352
353        reconcile_generic_metrics_configmap(self, ctx.clone()).await?;
354
355        // Before we reconcile CNPG, we need to make sure that spec.backup.volumeSnapshot is
356        // enabled in the CoreDB spec if cfg.enable_volume_snapshot = true.  If it's not
357        // then we should enable it, otherwise it should be a no-op.
358        self.enable_volume_snapshot(cfg, ctx.clone()).await?;
359
360        reconcile_cnpg(self, ctx.clone()).await?;
361        if cfg.enable_backup {
362            reconcile_cnpg_scheduled_backup(self, ctx.clone()).await?;
363        }
364
365        // Cleanup old Postgres Exporter Deployments, Service, ServiceAccount, Role and RoleBinding
366        crate::deployment_postgres_exporter::cleanup_postgres_exporter(self, ctx.clone())
367            .await
368            .map_err(|e| {
369                error!("Error reconciling prometheus exporter deployment: {:?}", e);
370                Action::requeue(Duration::from_secs(300))
371            })?;
372
373        // Reconcile Pooler resource
374        reconcile_pooler(self, ctx.clone(), placement_config.clone()).await?;
375
376        // Check if Postgres is already running
377        let pg_postmaster_start_time = is_not_restarting(self, ctx.clone(), "postgres").await?;
378
379        let patch_status = json!({
380            "apiVersion": "coredb.io/v1alpha1",
381            "kind": "CoreDB",
382            "status": {
383                "running": true,
384                "pg_postmaster_start_time": pg_postmaster_start_time,
385            }
386        });
387        patch_cdb_status_merge(&coredbs, &name, patch_status).await?;
388        let (trunk_installs, extensions) =
389            reconcile_extensions(self, ctx.clone(), &coredbs, &name).await?;
390
391        let recovery_time = self
392            .get_recovery_time(ctx.clone(), cfg.enable_volume_snapshot)
393            .await?;
394        let last_archiver_status = reconcile_last_archive_status(self, ctx.clone()).await?;
395
396        let current_config_values = get_current_config_values(self, ctx.clone()).await?;
397
398        #[allow(deprecated)]
399        let new_status = CoreDBStatus {
400            running: true,
401            extensionsUpdating: false,
402            storage: Some(self.spec.storage.clone()),
403            extensions: Some(extensions),
404            trunk_installs: Some(trunk_installs),
405            resources: Some(self.spec.resources.clone()),
406            runtime_config: Some(current_config_values),
407            first_recoverability_time: recovery_time,
408            last_fully_reconciled_at: None,
409            pg_postmaster_start_time,
410            last_archiver_status,
411        };
412
413        debug!("Updating CoreDB status to {:?} for {name}", new_status);
414
415        let patch_status = json!({
416            "apiVersion": "coredb.io/v1alpha1",
417            "kind": "CoreDB",
418            "status": new_status
419        });
420
421        patch_cdb_status_merge(&coredbs, &name, patch_status).await?;
422
423        reconcile_heartbeat(self, ctx.clone()).await?;
424
425        // Cleanup old volume snapshots that are older than the retention period
426        // set in cfg.volume_snapshot_retention_period
427        // if volumesnapshots is enabled
428        if cfg.enable_volume_snapshot {
429            match cleanup_old_volume_snapshots(
430                self,
431                client,
432                cfg.volume_snapshot_retention_period_days,
433            )
434            .await
435            {
436                Ok(_) => {
437                    info!(
438                        "Successfully cleaned up old volume snapshots for instance: {}",
439                        self.name_any()
440                    );
441                }
442                Err(action) => {
443                    return Err(action);
444                }
445            }
446        }
447
448        info!("Fully reconciled {}", self.name_any());
449        Ok(requeue_normal_with_jitter())
450    }
451
452    // enable_volume_snapshot makes sure that the CoreDB spec has the spec.backup.volumeSnapshot
453    // enabled.  If it's already enabled, then do nothing.
454    #[instrument(skip(self, ctx))]
455    async fn enable_volume_snapshot(&self, cfg: &Config, ctx: Arc<Context>) -> Result<(), Action> {
456        let client = ctx.client.clone();
457        let name = self.name_any();
458        let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
459            error!("CoreDB namespace is empty for instance: {}.", name);
460            Action::requeue(tokio::time::Duration::from_secs(300))
461        })?;
462
463        // Setup the client for the CoreDB
464        let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), namespace);
465
466        // Check if an update is needed based on the current value and the desired value from the config
467        if !is_volume_snapshot_update_needed(
468            self.spec.backup.volume_snapshot.as_ref(),
469            cfg.enable_volume_snapshot,
470        ) {
471            return Ok(());
472        }
473
474        // Create the patch to update the spec.backup.volumeSnapshot based on the config
475        let patch = create_volume_snapshot_patch(cfg);
476        let patch_params = PatchParams {
477            field_manager: Some("cntrlr".to_string()),
478            ..PatchParams::default()
479        };
480        let patch_status = Patch::Merge(patch.clone());
481        match coredbs.patch(&name, &patch_params, &patch_status).await {
482            Ok(_) => {
483                debug!("Successfully updated CoreDB status for {}", name);
484                Ok(())
485            }
486            Err(e) => {
487                error!("Error updating CoreDB status for {}: {:?}", name, e);
488                Err(Action::requeue(Duration::from_secs(10)))
489            }
490        }
491    }
492
493    // Finalizer cleanup (the object was deleted, ensure nothing is orphaned)
494    #[instrument(skip(self, ctx))]
495    async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
496        let oref = self.object_ref(&());
497        // Document doesn't have any real cleanup, so we just publish an event
498        ctx.recorder
499            .publish(
500                &Event {
501                    type_: EventType::Normal,
502                    reason: "DeleteCoreDB".into(),
503                    note: Some(format!("Delete `{}`", self.name_any())),
504                    action: "Deleting".into(),
505                    secondary: None,
506                },
507                &oref,
508            )
509            .await
510            .map_err(Error::KubeError)?;
511        Ok(Action::await_change())
512    }
513
514    #[instrument(skip(self, client))]
515    async fn primary_pod_cnpg_conditional_readiness(
516        &self,
517        client: Client,
518        wait_for_ready: bool,
519    ) -> Result<Pod, Action> {
520        let requires_load =
521            extensions_that_require_load(client.clone(), &self.metadata.namespace.clone().unwrap())
522                .await?;
523        let cluster = cnpg_cluster_from_cdb(self, None, requires_load);
524        let cluster_name = cluster.metadata.name.as_ref().ok_or_else(|| {
525            error!(
526                "CNPG Cluster name is empty for instance: {}.",
527                self.name_any()
528            );
529            Action::requeue(tokio::time::Duration::from_secs(300))
530        })?;
531        let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
532            error!(
533                "CoreDB namespace is empty for instance: {}.",
534                self.name_any()
535            );
536            Action::requeue(tokio::time::Duration::from_secs(300))
537        })?;
538        let cluster_selector = format!("cnpg.io/cluster={}", cluster_name);
539        let role_selector = "role=primary";
540        let list_params = ListParams::default()
541            .labels(&cluster_selector)
542            .labels(role_selector);
543        let pods: Api<Pod> = Api::namespaced(client, namespace);
544        let pods = pods.list(&list_params);
545        // Return an error if the query fails
546        let pod_list = pods.await.map_err(|_e| {
547            // It is not expected to fail the query to the pods API
548            error!(
549                "Failed to query for CNPG primary pod of {}",
550                &self.name_any()
551            );
552            Action::requeue(Duration::from_secs(300))
553        })?;
554        // Return an error if the list is empty
555        if pod_list.items.is_empty() {
556            // It's expected to sometimes be empty, we should retry after a short duration
557            warn!("Failed to find CNPG primary pod of {}, this can be expected if the pod is restarting for some reason", &self.name_any());
558            return Err(Action::requeue(Duration::from_secs(5)));
559        }
560        let primary = pod_list.items[0].clone();
561
562        if wait_for_ready && !is_postgres_ready().matches_object(Some(&primary)) {
563            // It's expected to sometimes be empty, we should retry after a short duration
564            warn!(
565                "Found CNPG primary pod of {}, but it is not ready",
566                &self.name_any()
567            );
568            return Err(Action::requeue(Duration::from_secs(5)));
569        }
570
571        Ok(primary)
572    }
573
574    #[instrument(skip(self, client))]
575    pub async fn primary_pod_cnpg(&self, client: Client) -> Result<Pod, Action> {
576        self.primary_pod_cnpg_conditional_readiness(client, true)
577            .await
578    }
579
580    #[instrument(skip(self, client))]
581    pub async fn primary_pod_cnpg_ready_or_not(&self, client: Client) -> Result<Pod, Action> {
582        self.primary_pod_cnpg_conditional_readiness(client, false)
583            .await
584    }
585
586    #[instrument(skip(self, client))]
587    async fn pods_by_cluster_conditional_readiness(
588        &self,
589        client: Client,
590        wait_for_ready: bool,
591    ) -> Result<Vec<Pod>, Action> {
592        let requires_load =
593            extensions_that_require_load(client.clone(), &self.metadata.namespace.clone().unwrap())
594                .await?;
595        let cluster = cnpg_cluster_from_cdb(self, None, requires_load);
596        let cluster_name = cluster.metadata.name.as_ref().ok_or_else(|| {
597            error!(
598                "CNPG Cluster name is empty for instance: {}.",
599                self.name_any()
600            );
601            Action::requeue(tokio::time::Duration::from_secs(300))
602        })?;
603        let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
604            error!(
605                "CoreDB namespace is empty for instance: {}.",
606                self.name_any()
607            );
608            Action::requeue(tokio::time::Duration::from_secs(300))
609        })?;
610
611        // Added role labels here
612        let cluster_selector =
613            format!("cnpg.io/cluster={cluster_name},cnpg.io/podRole=instance,role=primary");
614        let replica_selector = format!("cnpg.io/cluster={cluster_name},role=replica");
615
616        let list_params_cluster = ListParams::default().labels(&cluster_selector);
617        let list_params_replica = ListParams::default().labels(&replica_selector);
618
619        let pods: Api<Pod> = Api::namespaced(client, namespace);
620        let primary_pods = pods.list(&list_params_cluster);
621        let replica_pods = pods.list(&list_params_replica);
622
623        let primary_pod_list = primary_pods.await.map_err(|_e| {
624            error!(
625                "Failed to query for CNPG primary pods of {}",
626                &self.name_any()
627            );
628            Action::requeue(Duration::from_secs(300))
629        })?;
630
631        let replica_pod_list = replica_pods.await.map_err(|_e| {
632            error!(
633                "Failed to query for CNPG replica pods of {}",
634                &self.name_any()
635            );
636            Action::requeue(Duration::from_secs(300))
637        })?;
638
639        let pod_list = [primary_pod_list.items, replica_pod_list.items].concat();
640
641        if pod_list.is_empty() {
642            warn!("Failed to find CNPG pods of {}", &self.name_any());
643            return Err(Action::requeue(Duration::from_secs(30)));
644        }
645
646        // Filter only pods that are ready
647        let ready_pods: Vec<Pod> = pod_list
648            .into_iter()
649            .filter(|pod| {
650                if let Some(conditions) = &pod.status.as_ref().and_then(|s| s.conditions.as_ref()) {
651                    conditions
652                        .iter()
653                        .any(|c| c.type_ == "Ready" && c.status == "True")
654                } else {
655                    false
656                }
657            })
658            .collect();
659
660        // If the instance has a pod that is not ready and is not a restore instance, requeue
661        if wait_for_ready && ready_pods.is_empty() {
662            warn!("Failed to find ready CNPG pods of {}", &self.name_any());
663            return Err(Action::requeue(Duration::from_secs(30)));
664        }
665
666        Ok(ready_pods)
667    }
668
669    #[instrument(skip(self, client))]
670    pub async fn pods_by_cluster(&self, client: Client) -> Result<Vec<Pod>, Action> {
671        self.pods_by_cluster_conditional_readiness(client, true)
672            .await
673    }
674
675    #[instrument(skip(self, client))]
676    pub async fn pods_by_cluster_ready_or_not(&self, client: Client) -> Result<Vec<Pod>, Action> {
677        self.pods_by_cluster_conditional_readiness(client, false)
678            .await
679    }
680
681    #[instrument(skip(self, client))]
682    async fn check_replica_count_matches_pods(&self, client: Client) -> Result<(), Action> {
683        // Fetch current replica count from Self
684        let desired_replica_count = self.spec.replicas;
685        debug!(
686            "Instance {} has a desired replica count: {}",
687            self.name_any(),
688            desired_replica_count
689        );
690
691        // Fetch current pods with pods_by_cluster
692        let current_pods = self.pods_by_cluster(client.clone()).await?;
693        let pod_names: Vec<String> = current_pods.iter().map(|pod| pod.name_any()).collect();
694        debug!(
695            "Found {} pods, {:?} for {}",
696            current_pods.len(),
697            pod_names,
698            self.name_any()
699        );
700
701        // Check if the number of running pods matches the desired replica count
702        if current_pods.len() != desired_replica_count as usize {
703            warn!(
704                "Number of running pods ({}) does not match desired replica count ({}) for ({}). Requeuing.",
705                current_pods.len(),
706                desired_replica_count,
707                self.name_any()
708            );
709            return Err(Action::requeue(Duration::from_secs(10)));
710        }
711
712        info!(
713            "Number of running pods ({}) matches desired replica count ({}) for ({}).",
714            current_pods.len(),
715            desired_replica_count,
716            self.name_any()
717        );
718        Ok(())
719    }
720
721    pub async fn log_pod_status(&self, client: Client, pod_name: &str) -> Result<(), kube::Error> {
722        let namespace = self
723            .metadata
724            .namespace
725            .clone()
726            .expect("CoreDB should have a namespace");
727        let pods: Api<Pod> = Api::namespaced(client.clone(), &namespace);
728        match pods.get(pod_name).await {
729            Ok(pod) => {
730                let status = pod
731                    .status
732                    .as_ref()
733                    .map(|s| format!("{:?}", s))
734                    .unwrap_or_else(|| "Unknown".to_string());
735                debug!(
736                    "Status of instance {} pod {} in namespace {}: {}",
737                    self.metadata
738                        .name
739                        .clone()
740                        .expect("CoreDB should have a name"),
741                    pod_name,
742                    namespace,
743                    status
744                );
745                Ok(())
746            }
747            Err(e) => Err(e),
748        }
749    }
750
751    #[instrument(skip(self, context))]
752    pub async fn psql(
753        &self,
754        command: String,
755        database: String,
756        context: Arc<Context>,
757    ) -> Result<PsqlOutput, Action> {
758        let pod = self.primary_pod_cnpg(context.client.clone()).await?;
759        let pod_name_cnpg = pod.metadata.name.as_ref().ok_or_else(|| {
760            error!("Pod name is empty for instance: {}.", self.name_any());
761            Action::requeue(tokio::time::Duration::from_secs(300))
762        })?;
763
764        let cnpg_psql_command = PsqlCommand::new(
765            pod_name_cnpg.clone(),
766            self.metadata.namespace.clone().unwrap(),
767            command,
768            database,
769            context.clone(),
770        );
771        debug!("Running exec command in {}", pod_name_cnpg);
772        cnpg_psql_command.execute().await
773    }
774
775    pub async fn exec(
776        &self,
777        pod_name: String,
778        client: Client,
779        command: &[String],
780    ) -> Result<ExecOutput, Error> {
781        ExecCommand::new(pod_name, self.metadata.namespace.clone().unwrap(), client)
782            .execute(command)
783            .await
784    }
785
786    fn process_backups(&self, backup_list: Vec<Backup>) -> Option<DateTime<Utc>> {
787        let backup = backup_list
788            .iter()
789            .filter_map(|backup| backup.status.as_ref())
790            .filter(|status| status.phase.as_deref() == Some("completed"))
791            .filter_map(|status| status.stopped_at.as_ref())
792            .filter_map(|stopped_at_str| DateTime::parse_from_rfc3339(stopped_at_str).ok())
793            .map(|dt_with_offset| dt_with_offset.with_timezone(&Utc))
794            .min();
795
796        backup
797    }
798
799    // get_recovery_time returns the time at which the first recovery will be possible from the
800    // oldest completed Backup object in the namespace.
801    #[instrument(skip(self, context))]
802    pub async fn get_recovery_time(
803        &self,
804        context: Arc<Context>,
805        enable_volume_snapshot: bool,
806    ) -> Result<Option<DateTime<Utc>>, Action> {
807        let client = context.client.clone();
808        let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
809            error!(
810                "CoreDB namespace is empty for instance: {}.",
811                self.name_any()
812            );
813            Action::requeue(tokio::time::Duration::from_secs(300))
814        })?;
815        let cluster_name = self.name_any();
816        let backup: Api<Backup> = Api::namespaced(client, namespace);
817
818        // Determine the scheduled backup label suffix based on enable_volume_snapshot
819        let scheduled_backup_name = if enable_volume_snapshot {
820            format!("{}-snap", cluster_name)
821        } else {
822            cluster_name.clone()
823        };
824
825        // Create label selector with both cluster name and scheduled backup name
826        let label_selector = format!(
827            "cnpg.io/cluster={},cnpg.io/scheduled-backup={}",
828            cluster_name, scheduled_backup_name
829        );
830
831        let lp = ListParams::default().labels(&label_selector);
832        let backup_list = backup.list(&lp).await.map_err(|e| {
833            error!("Error getting backups: {:?}", e);
834            Action::requeue(Duration::from_secs(300))
835        })?;
836
837        let oldest_backup_time = self.process_backups(backup_list.items);
838
839        Ok(oldest_backup_time)
840    }
841}
842
843pub fn is_pod_ready() -> impl Condition<Pod> + 'static {
844    move |obj: Option<&Pod>| {
845        if let Some(pod) = &obj {
846            if let Some(status) = &pod.status {
847                if let Some(conds) = &status.conditions {
848                    if let Some(pcond) = conds.iter().find(|c| c.type_ == "ContainersReady") {
849                        return pcond.status == "True";
850                    }
851                }
852            }
853        }
854        false
855    }
856}
857
858pub fn is_postgres_ready() -> impl Condition<Pod> + 'static {
859    move |obj: Option<&Pod>| {
860        if let Some(pod) = &obj {
861            if let Some(status) = &pod.status {
862                if let Some(container_statuses) = &status.container_statuses {
863                    for container in container_statuses {
864                        if container.name == "postgres" {
865                            return container.ready;
866                        }
867                    }
868                }
869            }
870        }
871        false
872    }
873}
874
875#[instrument(skip(ctx, cdb))]
876pub async fn get_current_coredb_resource(
877    cdb: &CoreDB,
878    ctx: Arc<Context>,
879) -> Result<CoreDB, Action> {
880    let coredb_name = cdb.name_any();
881    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
882        error!("Namespace is empty for instance: {}.", &coredb_name);
883        Action::requeue(tokio::time::Duration::from_secs(300))
884    })?;
885    let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
886    let coredb = coredb_api.get(&coredb_name).await.map_err(|e| {
887        error!("Error getting CoreDB resource: {:?}", e);
888        Action::requeue(Duration::from_secs(10))
889    })?;
890    Ok(coredb)
891}
892
893// Get current config values
894pub async fn get_current_config_values(
895    cdb: &CoreDB,
896    ctx: Arc<Context>,
897) -> Result<Vec<PgConfig>, Action> {
898    let cfg = list_config_params(cdb, ctx.clone()).await?;
899    Ok(cfg)
900}
901
902pub async fn patch_cdb_status_merge(
903    cdb: &Api<CoreDB>,
904    name: &str,
905    patch: serde_json::Value,
906) -> Result<(), Action> {
907    let pp = PatchParams {
908        field_manager: Some("cntrlr".to_string()),
909        ..PatchParams::default()
910    };
911    let patch_status = Patch::Merge(patch.clone());
912    match cdb.patch_status(name, &pp, &patch_status).await {
913        Ok(_) => {
914            debug!("Successfully updated CoreDB status for {}", name);
915            Ok(())
916        }
917        Err(e) => {
918            error!("Error updating CoreDB status for {}: {:?}", name, e);
919            Err(Action::requeue(Duration::from_secs(10)))
920        }
921    }
922}
923
924/// Diagnostics to be exposed by the web server
925#[derive(Clone, Serialize)]
926pub struct Diagnostics {
927    #[serde(deserialize_with = "from_ts")]
928    pub last_event: DateTime<Utc>,
929    #[serde(skip)]
930    pub reporter: Reporter,
931}
932impl Default for Diagnostics {
933    fn default() -> Self {
934        Self {
935            last_event: Utc::now(),
936            reporter: "tembo-controller".into(),
937        }
938    }
939}
940impl Diagnostics {
941    fn recorder(&self, client: Client) -> Recorder {
942        Recorder::new(client, self.reporter.clone())
943    }
944}
945
946/// State shared between the controller and the web server
947#[derive(Clone, Default)]
948pub struct State {
949    /// Diagnostics populated by the reconciler
950    diagnostics: Arc<RwLock<Diagnostics>>,
951    /// Metrics
952    metrics: Arc<Metrics>,
953}
954
955/// State wrapper around the controller outputs for the web server
956impl State {
957    /// Metrics getter
958    pub fn metrics(&self) -> String {
959        let mut buffer = String::new();
960        let registry = &*self.metrics.registry;
961        prometheus_client::encoding::text::encode(&mut buffer, registry).unwrap();
962        buffer
963    }
964
965    /// State getter
966    pub async fn diagnostics(&self) -> Diagnostics {
967        self.diagnostics.read().await.clone()
968    }
969
970    // Create a Controller Context that can update State
971    pub async fn to_context(&self, client: Client) -> Arc<Context> {
972        Arc::new(Context {
973            client: client.clone(),
974            recorder: self.diagnostics.read().await.recorder(client),
975            metrics: self.metrics.clone(),
976            diagnostics: self.diagnostics.clone(),
977        })
978    }
979}
980
981/// Initialize the controller and shared state (given the crd is installed)
982pub async fn run(state: State) {
983    let client = Client::try_default()
984        .await
985        .expect("failed to create kube Client");
986    let coredb = Api::<CoreDB>::all(client.clone());
987    if let Err(e) = coredb.list(&ListParams::default().limit(1)).await {
988        error!("CRD is not queryable; {e:?}. Is the CRD installed?");
989        info!("Installation: cargo run --bin crdgen | kubectl apply -f -");
990        std::process::exit(1);
991    }
992    let secret_api = Api::<Secret>::all(client.clone());
993    Controller::new(coredb, watcherConfig::default().any_semantic())
994        .owns(secret_api, watcherConfig::default().any_semantic())
995        .shutdown_on_signal()
996        .run(reconcile, error_policy, state.to_context(client).await)
997        .filter_map(|x| async move { std::result::Result::ok(x) })
998        .for_each(|_| futures::future::ready(()))
999        .await;
1000}
1001
1002// Tests rely on fixtures.rs
1003#[cfg(test)]
1004mod test {
1005    use super::{reconcile, Backup, Context, CoreDB};
1006    use crate::apis::coredb_types::VolumeSnapshot;
1007    use crate::cloudnativepg::{
1008        backups::{BackupCluster, BackupSpec, BackupStatus},
1009        VOLUME_SNAPSHOT_CLASS_NAME,
1010    };
1011    use crate::config::Config;
1012    use crate::controller::{create_volume_snapshot_patch, is_volume_snapshot_update_needed};
1013    use crate::fixtures::{timeout_after_1s, Scenario};
1014    use chrono::{DateTime, NaiveDate, Utc};
1015    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
1016    use std::sync::Arc;
1017
1018    #[tokio::test]
1019    async fn new_coredbs_without_finalizers_gets_a_finalizer() {
1020        let (testctx, fakeserver) = Context::test();
1021        let coredb = CoreDB::test();
1022        let mocksrv = fakeserver.run(Scenario::FinalizerCreation(coredb.clone()));
1023        reconcile(Arc::new(coredb), testctx)
1024            .await
1025            .expect("reconciler");
1026        timeout_after_1s(mocksrv).await;
1027    }
1028
1029    #[tokio::test]
1030    async fn test_process_backups() {
1031        let coredb = CoreDB::test();
1032        let backup_name = "test-backup-1".to_string();
1033        let namespace = "test".to_string();
1034
1035        let backup_list = vec![Backup {
1036            metadata: ObjectMeta {
1037                name: Some(backup_name.clone()),
1038                namespace: Some(namespace),
1039                ..Default::default()
1040            },
1041            spec: BackupSpec {
1042                cluster: BackupCluster {
1043                    name: backup_name.clone(),
1044                },
1045                ..Default::default()
1046            },
1047            status: Some(BackupStatus {
1048                phase: Some("completed".to_string()),
1049                stopped_at: Some("2023-09-19T23:14:00Z".to_string()),
1050                ..Default::default()
1051            }),
1052        }];
1053
1054        let oldest_backup_time = coredb.process_backups(backup_list);
1055
1056        let expected_time = NaiveDate::from_ymd_opt(2023, 9, 19)
1057            .and_then(|date| date.and_hms_opt(23, 14, 0))
1058            .map(|naive_dt| DateTime::from_naive_utc_and_offset(naive_dt, Utc));
1059        assert_eq!(oldest_backup_time, expected_time);
1060    }
1061
1062    #[tokio::test]
1063    async fn test_process_backups_multiple_backups() {
1064        let coredb = CoreDB::test();
1065
1066        let backup_list = vec![
1067            Backup {
1068                metadata: ObjectMeta {
1069                    name: Some("backup-1".to_string()),
1070                    namespace: Some("test".to_string()),
1071                    ..Default::default()
1072                },
1073                spec: BackupSpec {
1074                    cluster: BackupCluster {
1075                        name: "backup-1".to_string(),
1076                    },
1077                    ..Default::default()
1078                },
1079                status: Some(BackupStatus {
1080                    phase: Some("completed".to_string()),
1081                    stopped_at: Some("2023-09-19T23:14:00Z".to_string()),
1082                    ..Default::default()
1083                }),
1084            },
1085            Backup {
1086                metadata: ObjectMeta {
1087                    name: Some("backup-2".to_string()),
1088                    namespace: Some("test".to_string()),
1089                    ..Default::default()
1090                },
1091                spec: BackupSpec {
1092                    cluster: BackupCluster {
1093                        name: "backup-2".to_string(),
1094                    },
1095                    ..Default::default()
1096                },
1097                status: Some(BackupStatus {
1098                    phase: Some("completed".to_string()),
1099                    stopped_at: Some("2023-09-18T22:12:00Z".to_string()), // This is the oldest
1100                    ..Default::default()
1101                }),
1102            },
1103            Backup {
1104                metadata: ObjectMeta {
1105                    name: Some("backup-3".to_string()),
1106                    namespace: Some("test".to_string()),
1107                    ..Default::default()
1108                },
1109                spec: BackupSpec {
1110                    cluster: BackupCluster {
1111                        name: "backup-3".to_string(),
1112                    },
1113                    ..Default::default()
1114                },
1115                status: Some(BackupStatus {
1116                    phase: Some("completed".to_string()),
1117                    stopped_at: Some("2023-09-19T21:11:00Z".to_string()),
1118                    ..Default::default()
1119                }),
1120            },
1121            Backup {
1122                metadata: ObjectMeta {
1123                    name: Some("backup-4".to_string()),
1124                    namespace: Some("test".to_string()),
1125                    ..Default::default()
1126                },
1127                spec: BackupSpec {
1128                    cluster: BackupCluster {
1129                        name: "backup-4".to_string(),
1130                    },
1131                    ..Default::default()
1132                },
1133                status: Some(BackupStatus {
1134                    phase: Some("failed".to_string()),
1135                    stopped_at: Some("2023-09-19T21:11:00Z".to_string()),
1136                    ..Default::default()
1137                }),
1138            },
1139        ];
1140
1141        let oldest_backup_time = coredb.process_backups(backup_list);
1142
1143        let expected_time = NaiveDate::from_ymd_opt(2023, 9, 18)
1144            .and_then(|date| date.and_hms_opt(22, 12, 0))
1145            .map(|naive_dt| DateTime::from_naive_utc_and_offset(naive_dt, Utc));
1146
1147        assert_eq!(oldest_backup_time, expected_time);
1148    }
1149
1150    #[tokio::test]
1151    async fn test_process_backups_no_backup() {
1152        let coredb = CoreDB::test();
1153
1154        // An empty list to simulate no Backups
1155        let backup_list: Vec<Backup> = vec![];
1156
1157        let oldest_backup_time = coredb.process_backups(backup_list);
1158
1159        // We expect None since there are no Backups
1160        assert_eq!(oldest_backup_time, None);
1161    }
1162
1163    #[test]
1164    fn test_create_volume_snapshot_patch_enabled() {
1165        let cfg = Config {
1166            enable_volume_snapshot: true,
1167            ..Config::default()
1168        };
1169
1170        let expected_volume_snapshot = VolumeSnapshot {
1171            enabled: true,
1172            snapshot_class: Some(VOLUME_SNAPSHOT_CLASS_NAME.to_string()),
1173        };
1174
1175        let actual_patch = create_volume_snapshot_patch(&cfg);
1176
1177        // Deserialize the actual_patch into a VolumeSnapshot instance
1178        let actual_volume_snapshot: VolumeSnapshot =
1179            serde_json::from_value(actual_patch["spec"]["backup"]["volumeSnapshot"].clone())
1180                .expect("Failed to deserialize actual_patch into VolumeSnapshot");
1181
1182        assert_eq!(actual_volume_snapshot, expected_volume_snapshot);
1183    }
1184
1185    #[test]
1186    fn test_create_volume_snapshot_patch_disabled() {
1187        let cfg = Config {
1188            enable_volume_snapshot: false,
1189            ..Config::default()
1190        };
1191
1192        let expected_volume_snapshot = VolumeSnapshot {
1193            enabled: false,
1194            snapshot_class: None,
1195        };
1196
1197        let actual_patch = create_volume_snapshot_patch(&cfg);
1198
1199        // Deserialize the actual_patch into a VolumeSnapshot instance
1200        let actual_volume_snapshot: VolumeSnapshot =
1201            serde_json::from_value(actual_patch["spec"]["backup"]["volumeSnapshot"].clone())
1202                .expect("Failed to deserialize actual_patch into VolumeSnapshot");
1203
1204        assert_eq!(actual_volume_snapshot, expected_volume_snapshot);
1205    }
1206
1207    #[test]
1208    fn test_is_volume_snapshot_update_needed() {
1209        let volume_snapshot_enabled = Some(VolumeSnapshot {
1210            enabled: true,
1211            snapshot_class: Some(VOLUME_SNAPSHOT_CLASS_NAME.to_string()),
1212        });
1213        let volume_snapshot_disabled = Some(VolumeSnapshot {
1214            enabled: false,
1215            snapshot_class: None,
1216        });
1217
1218        // Test cases where no update is needed
1219        assert!(!is_volume_snapshot_update_needed(
1220            volume_snapshot_enabled.as_ref(),
1221            true
1222        ));
1223        assert!(!is_volume_snapshot_update_needed(
1224            volume_snapshot_disabled.as_ref(),
1225            false
1226        ));
1227        assert!(!is_volume_snapshot_update_needed(None, false));
1228
1229        // Test cases where an update is needed
1230        assert!(is_volume_snapshot_update_needed(
1231            volume_snapshot_enabled.as_ref(),
1232            false
1233        ));
1234        assert!(is_volume_snapshot_update_needed(
1235            volume_snapshot_disabled.as_ref(),
1236            true
1237        ));
1238        assert!(is_volume_snapshot_update_needed(None, true));
1239    }
1240
1241    // Test the error_policy function, we need to mock the ctx and cdb to mimic a 429 error code
1242    use crate::{error_policy, Error};
1243    use bytes::Bytes;
1244    use futures::pin_mut;
1245    use http::{Request, Response, StatusCode};
1246    use http_body_util::Full;
1247    use k8s_openapi::api::core::v1::Pod;
1248    use kube::runtime::events::Recorder;
1249    use kube::{api::Api, client::Body, Client};
1250    use serde_json::json;
1251    use tower_test::mock;
1252
1253    #[tokio::test]
1254    async fn test_error_policy_429() {
1255        // setup a test CoreDB object
1256        let coredb = CoreDB::test();
1257
1258        // mock the Kubernetes client and setup Context
1259        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Full<Bytes>>>();
1260        let client = Client::new(mock_service, "default".to_string());
1261        let ctx = Arc::new(Context {
1262            client: client.clone(),
1263            metrics: Default::default(),
1264            diagnostics: Default::default(),
1265            recorder: Recorder::new(client.clone(), "tembo-controller".into()),
1266        });
1267
1268        // setup the mock response 429 too many requests
1269        let spawned = tokio::spawn(async move {
1270            pin_mut!(handle);
1271            if let Some((_request, send)) = handle.next_request().await {
1272                // We don't check the specifics of the request here, focusing on the response
1273                send.send_response(
1274                    Response::builder()
1275                        .status(StatusCode::TOO_MANY_REQUESTS)
1276                        .body(Full::new(Bytes::from(
1277                            json!({
1278                                "kind": "Status",
1279                                "apiVersion": "v1",
1280                                "metadata": {},
1281                                "status": "Failure",
1282                                "message": "Too Many Requests",
1283                                "reason": "TooManyRequests",
1284                                "code": 429
1285                            })
1286                            .to_string(),
1287                        )))
1288                        .unwrap(),
1289                );
1290            }
1291        });
1292
1293        // Setup call to kubernetes api Pod
1294        let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
1295        let err = pod_api.get("test-pod").await.err().unwrap();
1296
1297        // Convert the KubeError into your custom error type as it would in your controller logic
1298        let custom_error = Error::from(err);
1299
1300        // Now we simulate calling the error_policy function with this error
1301        let action = error_policy(Arc::new(coredb), &custom_error, ctx);
1302        let action_str = format!("{:?}", action);
1303
1304        println!("Action: {:?}", action);
1305
1306        // Use regular expressions to extract the duration from the action string
1307        let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
1308        if let Some(captures) = re.captures(&action_str) {
1309            let duration_secs = captures[1].parse::<u64>().unwrap();
1310            assert!((60..=180).contains(&duration_secs));
1311        } else {
1312            panic!("Unexpected action format: {}", action_str);
1313        }
1314
1315        spawned.await.unwrap();
1316    }
1317
1318    #[tokio::test]
1319    async fn test_error_policy_non_429() {
1320        // setup a test CoreDB object
1321        let coredb = CoreDB::test();
1322
1323        // mock the Kubernetes client and setup Context
1324        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Full<Bytes>>>();
1325        let client = Client::new(mock_service, "default".to_string());
1326        let ctx = Arc::new(Context {
1327            client: client.clone(),
1328            metrics: Default::default(),
1329            diagnostics: Default::default(),
1330            recorder: Recorder::new(client.clone(), "tembo-controller".into()),
1331        });
1332
1333        // setup the mock response 404 Not Found
1334        let spawned = tokio::spawn(async move {
1335            pin_mut!(handle);
1336            if let Some((_request, send)) = handle.next_request().await {
1337                send.send_response(
1338                    Response::builder()
1339                        .status(StatusCode::TOO_MANY_REQUESTS)
1340                        .body(Full::new(Bytes::from(
1341                            json!({
1342                                "kind": "Status",
1343                                "apiVersion": "v1",
1344                                "metadata": {},
1345                                "status": "Failure",
1346                                "message": "Not Found",
1347                                "reason": "Not Found",
1348                                "code": 404
1349                            })
1350                            .to_string(),
1351                        )))
1352                        .unwrap(),
1353                );
1354            }
1355        });
1356
1357        // Setup call to kubernetes api Pod
1358        let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
1359        let err = pod_api.get("test-pod").await.err().unwrap();
1360
1361        // Convert the KubeError into your custom error type as it would in your controller logic
1362        let custom_error = Error::from(err);
1363
1364        // Now we simulate calling the error_policy function with this error
1365        let action = error_policy(Arc::new(coredb), &custom_error, ctx);
1366        let action_str = format!("{:?}", action);
1367
1368        println!("Action: {:?}", action);
1369
1370        // Assert that the action is a requeue with a duration of 5 minutes (300 seconds)
1371        let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
1372        if let Some(captures) = re.captures(&action_str) {
1373            let duration_secs = captures[1].parse::<u64>().unwrap();
1374            assert_eq!(duration_secs, 300);
1375        } else {
1376            panic!("Unexpected action format: {}", action_str);
1377        }
1378
1379        spawned.await.unwrap();
1380    }
1381}