controller/cloudnativepg/
cnpg_utils.rs1pub use crate::{
2 apis::coredb_types::CoreDB,
3 cloudnativepg::backups::Backup,
4 cloudnativepg::clusters::{Cluster, ClusterStatusConditionsStatus},
5 cloudnativepg::poolers::Pooler,
6 cloudnativepg::scheduledbackups::ScheduledBackup,
7 controller,
8 extensions::database_queries::is_not_restarting,
9 patch_cdb_status_merge, requeue_normal_with_jitter, Context, RESTARTED_AT,
10};
11use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
12use kube::{
13 api::{DeleteParams, ListParams, Patch, PatchParams},
14 runtime::controller::Action,
15 Api, ResourceExt,
16};
17use serde_json::json;
18use std::sync::Arc;
19use tokio::time::Duration;
20use tracing::{debug, error, info, instrument, warn};
21
22#[instrument(skip(cdb, ctx, prev_cluster), fields(trace_id, instance_name = %cdb.name_any()))]
25pub(crate) async fn restart_and_wait_for_restart(
26 cdb: &CoreDB,
27 ctx: Arc<Context>,
28 prev_cluster: Option<&Cluster>,
29) -> Result<(), Action> {
30 if prev_cluster.is_none() {
32 warn!("No previous cluster found for {}", cdb.name_any());
33 return Ok(());
34 }
35
36 let Some(cdb_restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
37 warn!("No restart annotation found for {}", cdb.name_any());
39 return Ok(());
40 };
41
42 let previous_restarted_at =
44 prev_cluster.and_then(|cluster| cluster.annotations().get(RESTARTED_AT));
45
46 let restart_annotation_updated = previous_restarted_at != Some(cdb_restarted_at);
47 debug!(
48 "Restart annotation updated: {} for instance: {}",
49 restart_annotation_updated,
50 cdb.name_any()
51 );
52
53 if restart_annotation_updated {
54 warn!(
55 "Restarting instance: {} with restart annotation: {}",
56 cdb.name_any(),
57 cdb_restarted_at
58 );
59
60 let restart_patch = json!({
61 "metadata": {
62 "annotations": {
63 RESTARTED_AT: cdb_restarted_at,
64 }
65 }
66 });
67
68 patch_cluster_merge(cdb, &ctx, restart_patch).await?;
69 update_coredb_status(cdb, &ctx, false).await?;
70
71 info!(
72 "Updated status.running to false in {}, requeuing 10 seconds",
73 &cdb.name_any()
74 );
75
76 let restart_complete_time = is_not_restarting(cdb, ctx.clone(), "postgres").await?;
77
78 info!(
79 "Restart time is {:?} for instance: {}",
80 restart_complete_time,
81 &cdb.name_any()
82 );
83 }
84
85 let cdb_api: Api<CoreDB> =
86 Api::namespaced(ctx.client.clone(), cdb.metadata.namespace.as_ref().unwrap());
87 let coredb_status = cdb_api.get(&cdb.name_any()).await.map_err(|e| {
88 error!("Error retrieving CoreDB status: {}", e);
89 Action::requeue(Duration::from_secs(300))
90 })?;
91
92 if let Some(status) = coredb_status.status {
93 if !status.running {
94 update_coredb_status(cdb, &ctx, true).await?;
95 info!(
96 "Updated CoreDB status.running to true for instance: {}",
97 &cdb.name_any()
98 );
99 }
100 }
101
102 Ok(())
103}
104
105#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), running = %running))]
106pub(crate) async fn update_coredb_status(
107 cdb: &CoreDB,
108 ctx: &Arc<Context>,
109 running: bool,
110) -> Result<(), Action> {
111 let name = cdb.name_any();
112 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
113 error!("Namespace is empty for instance: {}.", name);
114 Action::requeue(Duration::from_secs(300))
115 })?;
116
117 let cdb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
118 patch_cdb_status_merge(
119 &cdb_api,
120 &name,
121 json!({
122 "status": {
123 "running": running
124 }
125 }),
126 )
127 .await
128}
129
130#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), patch = %patch))]
132pub async fn patch_cluster_merge(
133 cdb: &CoreDB,
134 ctx: &Arc<Context>,
135 patch: serde_json::Value,
136) -> Result<(), Action> {
137 let name = cdb.name_any();
138 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
139 error!("Namespace is empty for instance: {}.", name);
140 Action::requeue(Duration::from_secs(300))
141 })?;
142
143 let cluster_api: Api<Cluster> = Api::namespaced(ctx.client.clone(), namespace);
144 let pp = PatchParams::apply("patch_merge");
145 let _ = cluster_api
146 .patch(&name, &pp, &Patch::Merge(&patch))
147 .await
148 .map_err(|e| {
149 error!("Error patching cluster: {}", e);
150 Action::requeue(Duration::from_secs(300))
151 });
152
153 Ok(())
154}
155
156#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), patch = %patch))]
158pub async fn patch_scheduled_backup_merge(
159 cdb: &CoreDB,
160 ctx: &Arc<Context>,
161 backup_name: &str,
162 patch: serde_json::Value,
163) -> Result<(), Action> {
164 let name = cdb.name_any();
165 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
166 error!("Namespace is empty for instance: {}.", name);
167 Action::requeue(Duration::from_secs(300))
168 })?;
169
170 let scheduled_backup_api: Api<ScheduledBackup> = Api::namespaced(ctx.client.clone(), namespace);
171 let pp = PatchParams::apply("patch_merge");
172 let _ = scheduled_backup_api
173 .patch(backup_name, &pp, &Patch::Merge(&patch))
174 .await
175 .map_err(|e| {
176 error!("Error patching cluster: {}", e);
177 Action::requeue(Duration::from_secs(300))
178 });
179
180 Ok(())
181}
182
183#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any(), patch = %patch))]
185pub async fn patch_pooler_merge(
186 cdb: &CoreDB,
187 ctx: &Arc<Context>,
188 patch: serde_json::Value,
189) -> Result<(), Action> {
190 let name = cdb.name_any() + "-pooler";
191 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
192 error!("Namespace is empty for instance: {}.", name);
193 Action::requeue(Duration::from_secs(300))
194 })?;
195
196 let pooler_api: Api<Pooler> = Api::namespaced(ctx.client.clone(), namespace);
197 let pp = PatchParams::apply("patch_merge");
198 let _ = pooler_api
199 .patch(&name, &pp, &Patch::Merge(&patch))
200 .await
201 .map_err(|e| {
202 error!("Error patching cluster: {}", e);
203 Action::requeue(Duration::from_secs(300))
204 });
205
206 Ok(())
207}
208
209#[instrument(skip(cdb), fields(trace_id, instance_name = %cdb.name_any()))]
211pub fn get_pooler_instances(cdb: &CoreDB) -> Option<i32> {
212 Some(if cdb.spec.stop { 0 } else { 1 })
213}
214
215#[instrument(skip(cdb, maybe_cluster, new_spec), fields(trace_id, instance_name = %cdb.name_any()))]
219pub(crate) fn update_restarted_at(
220 cdb: &CoreDB,
221 maybe_cluster: Option<&Cluster>,
222 new_spec: &mut Cluster,
223) -> bool {
224 let Some(cdb_restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
225 return false;
227 };
228
229 let previous_restarted_at =
231 maybe_cluster.and_then(|cluster| cluster.annotations().get(RESTARTED_AT));
232
233 new_spec
236 .metadata
237 .annotations
238 .as_mut()
239 .map(|cluster_annotations| {
240 cluster_annotations.insert(RESTARTED_AT.into(), cdb_restarted_at.to_owned())
241 });
242
243 let restart_annotation_updated = previous_restarted_at != Some(cdb_restarted_at);
244
245 if restart_annotation_updated {
246 let name = new_spec.metadata.name.as_deref().unwrap_or("unknown");
247 info!("restartAt changed for cluster {name}, setting to {cdb_restarted_at}.");
248 }
249
250 restart_annotation_updated
251}
252
253#[instrument(skip(cdb, ctx, cluster) fields(trace_id, instance_name = %cdb.name_any()))]
255pub(crate) async fn patch_cluster(
256 cluster: &Cluster,
257 ctx: Arc<Context>,
258 cdb: &CoreDB,
259) -> Result<(), Action> {
260 let name = cdb.name_any();
261 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
262 error!("Namespace is empty for instance: {}.", name);
263 Action::requeue(tokio::time::Duration::from_secs(300))
264 })?;
265
266 let pp = PatchParams::apply("cntrlr").force();
268
269 let api: Api<Cluster> = Api::namespaced(ctx.client.clone(), namespace);
271
272 info!("Applying Cluster for instance: {}", &name);
273 let _o = api
274 .patch(&name, &pp, &Patch::Apply(&cluster))
275 .await
276 .map_err(|e| {
277 error!("Error patching Cluster: {}", e);
278 Action::requeue(Duration::from_secs(300))
279 })?;
280
281 Ok(())
282}
283
284#[instrument(skip(cdb, ctx, prev_cluster), fields(trace_id, instance_name = %cdb.name_any()))]
286pub(crate) async fn is_image_updated(
287 cdb: &CoreDB,
288 ctx: Arc<Context>,
289 prev_cluster: Option<&Cluster>,
290) -> Result<(), Action> {
291 if prev_cluster.is_none() {
293 warn!("No previous cluster found for {}", cdb.name_any());
294 return Ok(());
295 }
296
297 if let Some(prev_cluster) = prev_cluster {
299 let prev_image = prev_cluster.spec.image_name.as_deref();
300 let new_image = cdb.spec.image.as_str();
301
302 if let Some(prev_image) = prev_image {
303 if prev_image != new_image {
304 warn!(
305 "Image updated for instance: {} from {} to {}",
306 cdb.name_any(),
307 prev_image,
308 new_image
309 );
310
311 let patch = json!({
313 "spec": {
314 "imageName": new_image
315 }
316 });
317
318 patch_cluster_merge(cdb, &ctx, patch).await?;
320 }
321 }
322 }
323
324 Ok(())
325}
326
327#[instrument(skip(cdb, ctx), fields(trace_id, instance_name = %cdb.name_any()))]
331pub(crate) async fn removed_stalled_backups(
332 cdb: &CoreDB,
333 ctx: &Arc<Context>,
334) -> Result<(), Action> {
335 let name = cdb.name_any();
336 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
337 error!("Namespace is empty for instance: {}.", name);
338 Action::requeue(Duration::from_secs(300))
339 })?;
340
341 let backup_api: Api<Backup> = Api::namespaced(ctx.client.clone(), namespace);
342
343 let lp = ListParams {
345 label_selector: Some(format!("cnpg.io/cluster={}", name.as_str())),
346 ..ListParams::default()
347 };
348 let backups = backup_api.list(&lp).await.map_err(|e| {
349 error!("Error listing backups: {}", e);
350 Action::requeue(Duration::from_secs(300))
351 })?;
352
353 let stalled_time = Time(chrono::Utc::now() - chrono::Duration::hours(6));
354
355 for backup in &backups.items {
357 if backup.status.is_none() {
358 if let Some(creation_time) = backup.metadata.creation_timestamp.as_ref() {
359 if creation_time < &stalled_time {
360 info!("Deleting stalled backup: {}", backup.name_any());
361 match backup_api
362 .delete(&backup.name_any(), &DeleteParams::default())
363 .await
364 {
365 Ok(_) => {
366 info!("Successfully deleted stalled backup: {}", backup.name_any())
367 }
368 Err(e) => error!(
369 "Failed to delete stalled backup {}: {}",
370 backup.name_any(),
371 e
372 ),
373 }
374 }
375 }
376 }
377 }
378
379 Ok(())
380}