controller/cloudnativepg/
cnpg_utils.rs

1pub use crate::{
2    apis::coredb_types::CoreDB,
3    cloudnativepg::backups::Backup,
4    cloudnativepg::clusters::{Cluster, ClusterStatusConditionsStatus},
5    cloudnativepg::poolers::Pooler,
6    cloudnativepg::scheduledbackups::ScheduledBackup,
7    controller,
8    extensions::database_queries::is_not_restarting,
9    patch_cdb_status_merge, requeue_normal_with_jitter, Context, RESTARTED_AT,
10};
11use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
12use kube::{
13    api::{DeleteParams, ListParams, Patch, PatchParams},
14    runtime::controller::Action,
15    Api, ResourceExt,
16};
17use serde_json::json;
18use std::sync::Arc;
19use tokio::time::Duration;
20use tracing::{debug, error, info, instrument, warn};
21
22// restart_and_wait_for_restart is a synchronous function that takes a CNPG cluster adds the restart annotation
23// and waits for the restart to complete.
24#[instrument(skip(cdb, ctx, prev_cluster), fields(trace_id, instance_name = %cdb.name_any()))]
25pub(crate) async fn restart_and_wait_for_restart(
26    cdb: &CoreDB,
27    ctx: Arc<Context>,
28    prev_cluster: Option<&Cluster>,
29) -> Result<(), Action> {
30    // Check if prev_cluster is None, if so return early
31    if prev_cluster.is_none() {
32        warn!("No previous cluster found for {}", cdb.name_any());
33        return Ok(());
34    }
35
36    let Some(cdb_restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
37        // No need to update the annotation if it's not present in the CoreDB
38        warn!("No restart annotation found for {}", cdb.name_any());
39        return Ok(());
40    };
41
42    // Get the previous value of the annotation, if any
43    let previous_restarted_at =
44        prev_cluster.and_then(|cluster| cluster.annotations().get(RESTARTED_AT));
45
46    let restart_annotation_updated = previous_restarted_at != Some(cdb_restarted_at);
47    debug!(
48        "Restart annotation updated: {} for instance: {}",
49        restart_annotation_updated,
50        cdb.name_any()
51    );
52
53    if restart_annotation_updated {
54        warn!(
55            "Restarting instance: {} with restart annotation: {}",
56            cdb.name_any(),
57            cdb_restarted_at
58        );
59
60        let restart_patch = json!({
61            "metadata": {
62                "annotations": {
63                    RESTARTED_AT: cdb_restarted_at,
64                }
65            }
66        });
67
68        patch_cluster_merge(cdb, &ctx, restart_patch).await?;
69        update_coredb_status(cdb, &ctx, false).await?;
70
71        info!(
72            "Updated status.running to false in {}, requeuing 10 seconds",
73            &cdb.name_any()
74        );
75
76        let restart_complete_time = is_not_restarting(cdb, ctx.clone(), "postgres").await?;
77
78        info!(
79            "Restart time is {:?} for instance: {}",
80            restart_complete_time,
81            &cdb.name_any()
82        );
83    }
84
85    let cdb_api: Api<CoreDB> =
86        Api::namespaced(ctx.client.clone(), cdb.metadata.namespace.as_ref().unwrap());
87    let coredb_status = cdb_api.get(&cdb.name_any()).await.map_err(|e| {
88        error!("Error retrieving CoreDB status: {}", e);
89        Action::requeue(Duration::from_secs(300))
90    })?;
91
92    if let Some(status) = coredb_status.status {
93        if !status.running {
94            update_coredb_status(cdb, &ctx, true).await?;
95            info!(
96                "Updated CoreDB status.running to true for instance: {}",
97                &cdb.name_any()
98            );
99        }
100    }
101
102    Ok(())
103}
104
105#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), running = %running))]
106pub(crate) async fn update_coredb_status(
107    cdb: &CoreDB,
108    ctx: &Arc<Context>,
109    running: bool,
110) -> Result<(), Action> {
111    let name = cdb.name_any();
112    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
113        error!("Namespace is empty for instance: {}.", name);
114        Action::requeue(Duration::from_secs(300))
115    })?;
116
117    let cdb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
118    patch_cdb_status_merge(
119        &cdb_api,
120        &name,
121        json!({
122            "status": {
123                "running": running
124            }
125        }),
126    )
127    .await
128}
129
130// patch_cluster_merge takes a CoreDB, context and serde_json::Value and patch merges the Cluster with the new spec
131#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), patch = %patch))]
132pub async fn patch_cluster_merge(
133    cdb: &CoreDB,
134    ctx: &Arc<Context>,
135    patch: serde_json::Value,
136) -> Result<(), Action> {
137    let name = cdb.name_any();
138    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
139        error!("Namespace is empty for instance: {}.", name);
140        Action::requeue(Duration::from_secs(300))
141    })?;
142
143    let cluster_api: Api<Cluster> = Api::namespaced(ctx.client.clone(), namespace);
144    let pp = PatchParams::apply("patch_merge");
145    let _ = cluster_api
146        .patch(&name, &pp, &Patch::Merge(&patch))
147        .await
148        .map_err(|e| {
149            error!("Error patching cluster: {}", e);
150            Action::requeue(Duration::from_secs(300))
151        });
152
153    Ok(())
154}
155
156// patch_scheduled_backup_merge takes a CoreDB, context and serde_json::Value and patch merges the ScheduledBackup with the new spec
157#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), patch = %patch))]
158pub async fn patch_scheduled_backup_merge(
159    cdb: &CoreDB,
160    ctx: &Arc<Context>,
161    backup_name: &str,
162    patch: serde_json::Value,
163) -> Result<(), Action> {
164    let name = cdb.name_any();
165    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
166        error!("Namespace is empty for instance: {}.", name);
167        Action::requeue(Duration::from_secs(300))
168    })?;
169
170    let scheduled_backup_api: Api<ScheduledBackup> = Api::namespaced(ctx.client.clone(), namespace);
171    let pp = PatchParams::apply("patch_merge");
172    let _ = scheduled_backup_api
173        .patch(backup_name, &pp, &Patch::Merge(&patch))
174        .await
175        .map_err(|e| {
176            error!("Error patching cluster: {}", e);
177            Action::requeue(Duration::from_secs(300))
178        });
179
180    Ok(())
181}
182
183// patch_pooler_merge takes a CoreDB, context and serde_json::Value and patch merges the Pooler with the new spec
184#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), patch = %patch))]
185pub async fn patch_pooler_merge(
186    cdb: &CoreDB,
187    ctx: &Arc<Context>,
188    patch: serde_json::Value,
189) -> Result<(), Action> {
190    let name = cdb.name_any() + "-pooler";
191    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
192        error!("Namespace is empty for instance: {}.", name);
193        Action::requeue(Duration::from_secs(300))
194    })?;
195
196    let pooler_api: Api<Pooler> = Api::namespaced(ctx.client.clone(), namespace);
197    let pp = PatchParams::apply("patch_merge");
198    let _ = pooler_api
199        .patch(&name, &pp, &Patch::Merge(&patch))
200        .await
201        .map_err(|e| {
202            error!("Error patching cluster: {}", e);
203            Action::requeue(Duration::from_secs(300))
204        });
205
206    Ok(())
207}
208
209// get_pooler_instances takes a CoreDB and returns an Option<i32> based if the CoreDB is hibernated
210#[instrument(skip(cdb), fields(trace_id, instance_name = %cdb.name_any()))]
211pub fn get_pooler_instances(cdb: &CoreDB) -> Option<i32> {
212    Some(if cdb.spec.stop { 0 } else { 1 })
213}
214
215// cdb: the CoreDB object
216// maybe_cluster, Option<Cluster> of the current CNPG cluster, if it exists
217// new_spec: the new Cluster spec to be applied
218#[instrument(skip(cdb, maybe_cluster, new_spec), fields(trace_id, instance_name = %cdb.name_any()))]
219pub(crate) fn update_restarted_at(
220    cdb: &CoreDB,
221    maybe_cluster: Option<&Cluster>,
222    new_spec: &mut Cluster,
223) -> bool {
224    let Some(cdb_restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
225        // No need to update the annotation if it's not present in the CoreDB
226        return false;
227    };
228
229    // Remember the previous value of the annotation, if any
230    let previous_restarted_at =
231        maybe_cluster.and_then(|cluster| cluster.annotations().get(RESTARTED_AT));
232
233    // Forward the `restartedAt` annotation from CoreDB over to the CNPG cluster,
234    // does not matter if changed or not.
235    new_spec
236        .metadata
237        .annotations
238        .as_mut()
239        .map(|cluster_annotations| {
240            cluster_annotations.insert(RESTARTED_AT.into(), cdb_restarted_at.to_owned())
241        });
242
243    let restart_annotation_updated = previous_restarted_at != Some(cdb_restarted_at);
244
245    if restart_annotation_updated {
246        let name = new_spec.metadata.name.as_deref().unwrap_or("unknown");
247        info!("restartAt changed for cluster {name}, setting to {cdb_restarted_at}.");
248    }
249
250    restart_annotation_updated
251}
252
253// patch_cluster is a async function that takes a CNPG cluster and patch applies it with the new spec
254#[instrument(skip(cdb, ctx, cluster) fields(trace_id, instance_name = %cdb.name_any()))]
255pub(crate) async fn patch_cluster(
256    cluster: &Cluster,
257    ctx: Arc<Context>,
258    cdb: &CoreDB,
259) -> Result<(), Action> {
260    let name = cdb.name_any();
261    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
262        error!("Namespace is empty for instance: {}.", name);
263        Action::requeue(tokio::time::Duration::from_secs(300))
264    })?;
265
266    // Setup patch parameters
267    let pp = PatchParams::apply("cntrlr").force();
268
269    // Setup cluster API
270    let api: Api<Cluster> = Api::namespaced(ctx.client.clone(), namespace);
271
272    info!("Applying Cluster for instance: {}", &name);
273    let _o = api
274        .patch(&name, &pp, &Patch::Apply(&cluster))
275        .await
276        .map_err(|e| {
277            error!("Error patching Cluster: {}", e);
278            Action::requeue(Duration::from_secs(300))
279        })?;
280
281    Ok(())
282}
283
284// is_image_updated function takes a CoreDB, Context and a Cluster and checks to see if the image needs to be updated
285#[instrument(skip(cdb, ctx, prev_cluster), fields(trace_id, instance_name = %cdb.name_any()))]
286pub(crate) async fn is_image_updated(
287    cdb: &CoreDB,
288    ctx: Arc<Context>,
289    prev_cluster: Option<&Cluster>,
290) -> Result<(), Action> {
291    // Check if prev_cluster is None, if so return early
292    if prev_cluster.is_none() {
293        warn!("No previous cluster found for {}", cdb.name_any());
294        return Ok(());
295    }
296
297    // Check if the image is being updated, check prev_cluster spec.imageName if it's different than what's in cdb.spec.image
298    if let Some(prev_cluster) = prev_cluster {
299        let prev_image = prev_cluster.spec.image_name.as_deref();
300        let new_image = cdb.spec.image.as_str();
301
302        if let Some(prev_image) = prev_image {
303            if prev_image != new_image {
304                warn!(
305                    "Image updated for instance: {} from {} to {}",
306                    cdb.name_any(),
307                    prev_image,
308                    new_image
309                );
310
311                // Create JSON Patch
312                let patch = json!({
313                    "spec": {
314                        "imageName": new_image
315                    }
316                });
317
318                // Update Cluster with patch
319                patch_cluster_merge(cdb, &ctx, patch).await?;
320            }
321        }
322    }
323
324    Ok(())
325}
326
327// remove_stalled_backups function takes a CoreDB, Conext and removed any stalled
328// backups. A backup is considered stalled if it's older than 6 hours and does not have a status set.
329// If a status is missing this means that the backup was never started nor will it ever start.
330#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any()))]
331pub(crate) async fn removed_stalled_backups(
332    cdb: &CoreDB,
333    ctx: &Arc<Context>,
334) -> Result<(), Action> {
335    let name = cdb.name_any();
336    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
337        error!("Namespace is empty for instance: {}.", name);
338        Action::requeue(Duration::from_secs(300))
339    })?;
340
341    let backup_api: Api<Backup> = Api::namespaced(ctx.client.clone(), namespace);
342
343    // List all backups for the cluster
344    let lp = ListParams {
345        label_selector: Some(format!("cnpg.io/cluster={}", name.as_str())),
346        ..ListParams::default()
347    };
348    let backups = backup_api.list(&lp).await.map_err(|e| {
349        error!("Error listing backups: {}", e);
350        Action::requeue(Duration::from_secs(300))
351    })?;
352
353    let stalled_time = Time(chrono::Utc::now() - chrono::Duration::hours(6));
354
355    // Filter backups that do not have a status set and are older than 24 hours
356    for backup in &backups.items {
357        if backup.status.is_none() {
358            if let Some(creation_time) = backup.metadata.creation_timestamp.as_ref() {
359                if creation_time < &stalled_time {
360                    info!("Deleting stalled backup: {}", backup.name_any());
361                    match backup_api
362                        .delete(&backup.name_any(), &DeleteParams::default())
363                        .await
364                    {
365                        Ok(_) => {
366                            info!("Successfully deleted stalled backup: {}", backup.name_any())
367                        }
368                        Err(e) => error!(
369                            "Failed to delete stalled backup {}: {}",
370                            backup.name_any(),
371                            e
372                        ),
373                    }
374                }
375            }
376        }
377    }
378
379    Ok(())
380}