use crate::{
apis::coredb_types::CoreDB,
cloudnativepg::backups::{Backup, BackupMethod},
snapshots::volumesnapshots_crd::VolumeSnapshot,
};
use chrono::{DateTime, Duration, Utc};
use kube::{
api::{Api, DeleteParams, ListParams},
runtime::controller::Action,
Client as KubeClient, ResourceExt,
};
use tracing::{debug, error, info, instrument, warn};
#[instrument(skip(cdb, client), fields(
instance = %cdb.name_any(),
namespace = %cdb.namespace().unwrap_or_default(),
retention_days = %retention_days
))]
pub async fn cleanup_old_volume_snapshots(
cdb: &CoreDB,
client: KubeClient,
retention_days: u64,
) -> Result<(), Action> {
let name = cdb.name_any();
let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
error!("Namespace is empty for instance: {}.", cdb.name_any());
Action::requeue(tokio::time::Duration::from_secs(300))
})?;
let backups_api: Api<Backup> = Api::namespaced(client.clone(), namespace);
let snapshots_api: Api<VolumeSnapshot> = Api::namespaced(client.clone(), namespace);
let cutoff_time = Utc::now() - Duration::days(retention_days as i64);
debug!(
cutoff_time = %cutoff_time,
"Starting cleanup of old volume snapshots for instance: {}", name
);
let all_backups = backups_api
.list(&ListParams::default())
.await
.map_err(|e| {
error!(
error = %e,
namespace = %namespace,
"Failed to list backups"
);
Action::requeue(tokio::time::Duration::from_secs(300))
})?
.items;
let volume_snapshot_backups: Vec<_> = all_backups
.into_iter()
.filter(|b| matches!(b.spec.method.as_ref(), Some(BackupMethod::VolumeSnapshot)))
.collect();
debug!(
backup_count = volume_snapshot_backups.len(),
"Found volume snapshot backups to evaluate for instance: {}", name
);
for backup in volume_snapshot_backups {
if should_delete_backup(&backup, cutoff_time) {
delete_backup_and_snapshot(&backups_api, &snapshots_api, &backup, namespace).await?;
}
}
info!("Completed volume snapshot cleanup for instance: {}", name);
Ok(())
}
#[instrument(skip(backup, cutoff_time), fields(
backup_name = %backup.name_any(),
creation_time = ?backup.metadata.creation_timestamp
))]
fn should_delete_backup(backup: &Backup, cutoff_time: DateTime<Utc>) -> bool {
backup
.metadata
.creation_timestamp
.as_ref()
.map(|ts| ts.0 < cutoff_time)
.unwrap_or(false)
}
#[instrument(skip(backups_api, snapshots_api, backup), fields(
backup_name = %backup.name_any(),
namespace = %namespace
))]
async fn delete_backup_and_snapshot(
backups_api: &Api<Backup>,
snapshots_api: &Api<VolumeSnapshot>,
backup: &Backup,
namespace: &str,
) -> Result<(), Action> {
let backup_name = backup.metadata.name.as_deref().unwrap_or("unknown");
match backups_api
.delete(backup_name, &DeleteParams::default())
.await
{
Ok(_) => {
info!(
"Deleted snapshot Backup '{}' for instance '{}', in namespace '{}'",
backup_name, namespace, namespace
);
}
Err(e) => {
warn!(
"Failed to delete snapshot Backup '{}' for instance '{}' in namespace '{}': {}",
backup_name, namespace, namespace, e
);
return Err(Action::requeue(tokio::time::Duration::from_secs(300)));
}
}
if let Some(status) = &backup.status {
if let Some(snapshot_name) = &status.backup_name {
match snapshots_api
.delete(snapshot_name, &DeleteParams::default())
.await
{
Ok(_) => {
info!(
"Deleted VolumeSnapshot '{}' for instance '{}' in namespace '{}'",
snapshot_name, namespace, namespace
);
}
Err(e) => {
warn!(
"Failed to delete VolumeSnapshot '{}' for instance '{}' in namespace '{}': {}",
snapshot_name, namespace, namespace, e
);
}
}
}
}
Ok(())
}