use std::{collections::HashSet, sync::Arc};
use chrono::{DateTime, Utc};
use futures::{StreamExt as _, TryStreamExt as _, stream};
use tokio::pin;
use tracing::instrument;
use crate::{
asset_manager::AssetManager,
format::SnapshotId,
ops::gc::{ExpireResult, ExpiredRefAction, GCError, GCResult},
refs::{Ref, delete_branch, delete_tag, list_refs},
};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ExpireRefResult {
NothingToDo {
ref_is_expired: bool,
},
Done {
released_snapshots: HashSet<SnapshotId>,
edited_snapshot: SnapshotId,
ref_is_expired: bool,
},
}
#[instrument(skip(asset_manager))]
pub async fn expire_ref(
asset_manager: Arc<AssetManager>,
reference: &Ref,
older_than: DateTime<Utc>,
) -> GCResult<ExpireRefResult> {
let snap_id = reference
.fetch(asset_manager.storage().as_ref(), asset_manager.storage_settings())
.await
.map(|ref_data| ref_data.snapshot)?;
tracing::info!("Starting expiration at ref {}", snap_id);
let mut editable_snap = snap_id.clone();
let mut root = snap_id.clone();
let mut released = HashSet::new();
#[expect(deprecated)]
let ancestry =
Arc::clone(&asset_manager).snapshot_ancestry_v1(&snap_id).await?.peekable();
pin!(ancestry);
let mut ref_is_expired = false;
if let Some(Ok(info)) = ancestry.as_mut().peek().await
&& info.flushed_at()? < older_than
{
tracing::debug!(flushed_at = %info.flushed_at()?, "Ref flagged as expired");
ref_is_expired = true;
}
while let Some(parent) = ancestry.try_next().await? {
let flushed_at = parent.flushed_at()?;
let parent_id = parent.id();
if parent.flushed_at()? >= older_than {
tracing::debug!(snap = %parent_id, flushed_at = %flushed_at, "Processing non expired snapshot");
editable_snap = parent_id;
} else {
tracing::debug!(snap = %parent_id, flushed_at = %flushed_at, "Processing expired snapshot");
released.insert(parent_id.clone());
root = parent_id;
}
}
released.remove(&root);
let editable_snap = asset_manager.fetch_snapshot(&editable_snap).await?;
#[expect(deprecated)]
let old_parent_id = editable_snap.parent_id();
if editable_snap.id() == root || Some(&root) == old_parent_id.as_ref()
|| root == snap_id
{
tracing::info!("Nothing to expire for this ref");
return Ok(ExpireRefResult::NothingToDo { ref_is_expired });
}
let root = asset_manager.fetch_snapshot(&root).await?;
#[expect(deprecated)]
{
assert!(editable_snap.parent_id().is_some());
}
#[expect(deprecated)]
{
assert!(root.parent_id().is_none());
}
tracing::info!(root = %root.id(), editable_snap=%editable_snap.id(), "Expiration needed for this ref");
#[expect(deprecated)]
let new_snapshot = Arc::new(root.adopt(&editable_snap)?);
asset_manager.write_snapshot(new_snapshot).await?;
tracing::info!("Snapshot overwritten");
Ok(ExpireRefResult::Done {
released_snapshots: released,
edited_snapshot: editable_snap.id().clone(),
ref_is_expired,
})
}
#[instrument(skip(asset_manager))]
pub async fn expire(
asset_manager: Arc<AssetManager>,
older_than: DateTime<Utc>,
expired_branches: ExpiredRefAction,
expired_tags: ExpiredRefAction,
) -> GCResult<ExpireResult> {
let storage = asset_manager.storage().as_ref();
let storage_settings = asset_manager.storage_settings();
let all_refs = stream::iter(list_refs(storage, storage_settings).await?);
let asset_manager = Arc::clone(&asset_manager);
all_refs
.then(move |r| {
let asset_manager = Arc::clone(&asset_manager);
async move {
let ref_result = expire_ref(asset_manager, &r, older_than).await?;
Ok::<(Ref, ExpireRefResult), GCError>((r, ref_result))
}
})
.try_fold(ExpireResult::default(), |mut result, (r, ref_result)| async move {
let ref_is_expired = match ref_result {
ExpireRefResult::Done {
released_snapshots,
edited_snapshot,
ref_is_expired,
} => {
result.released_snapshots.extend(released_snapshots.into_iter());
result.edited_snapshots.insert(edited_snapshot);
ref_is_expired
}
ExpireRefResult::NothingToDo { ref_is_expired } => ref_is_expired,
};
if ref_is_expired {
match &r {
Ref::Tag(name) => {
if expired_tags == ExpiredRefAction::Delete {
tracing::info!(name, "Deleting expired tag");
delete_tag(storage, storage_settings, name.as_str())
.await
.map_err(GCError::Ref)?;
result.deleted_refs.insert(r);
}
}
Ref::Branch(name) => {
if expired_branches == ExpiredRefAction::Delete
&& name != Ref::DEFAULT_BRANCH
{
tracing::info!(name, "Deleting expired branch");
delete_branch(storage, storage_settings, name.as_str())
.await
.map_err(GCError::Ref)?;
result.deleted_refs.insert(r);
}
}
}
}
Ok(result)
})
.await
}