use std::{
collections::{HashMap, HashSet},
future::ready,
num::{NonZeroU16, NonZeroUsize},
sync::{Arc, Mutex},
};
use backon::{BackoffBuilder as _, ExponentialBuilder, Retryable as _};
use chrono::{DateTime, Utc};
use futures::{Stream, StreamExt as _, TryStream, TryStreamExt as _, stream};
use itertools::Itertools as _;
use tokio::task::{self};
use tracing::{debug, error, info, instrument, trace};
use crate::{
StorageError,
asset_manager::AssetManager,
config::RepoUpdateRetryConfig,
format::{
ChunkId, FileTypeTag, IcechunkFormatError, ManifestId, ObjectId, SnapshotId,
format_constants::SpecVersionBin,
manifest::{ChunkPayload, Manifest},
repo_info::{RepoAvailability, RepoInfo, UpdateInfo, UpdateType},
snapshot::{ManifestFileInfo, Snapshot, SnapshotInfo},
},
ops::pointed_snapshots,
refs::{Ref, RefError},
repository::{RepositoryError, RepositoryErrorKind, RepositoryResult},
storage::{self, DeleteObjectsResult, ListInfo},
stream_utils::{StreamLimiter, try_unique_stream},
};
use icechunk_types::{ICResultExt as _, error::ICResultCtxExt as _};
#[derive(Debug, PartialEq, Eq)]
pub enum Action {
Keep,
DeleteIfCreatedBefore(DateTime<Utc>),
}
#[derive(Debug)]
pub struct GCConfig {
extra_roots: HashSet<SnapshotId>,
dangling_chunks: Action,
dangling_manifests: Action,
dangling_attributes: Action,
dangling_transaction_logs: Action,
dangling_snapshots: Action,
max_snapshots_in_memory: NonZeroU16,
max_compressed_manifest_mem_bytes: NonZeroUsize,
max_concurrent_manifest_fetches: NonZeroU16,
dry_run: bool,
}
impl GCConfig {
#[expect(clippy::too_many_arguments)]
pub fn new(
extra_roots: HashSet<SnapshotId>,
dangling_chunks: Action,
dangling_manifests: Action,
dangling_attributes: Action,
dangling_transaction_logs: Action,
dangling_snapshots: Action,
max_snapshots_in_memory: NonZeroU16,
max_compressed_manifest_mem_bytes: NonZeroUsize,
max_concurrent_manifest_fetches: NonZeroU16,
dry_run: bool,
) -> Self {
GCConfig {
extra_roots,
dangling_chunks,
dangling_manifests,
dangling_attributes,
dangling_transaction_logs,
dangling_snapshots,
max_snapshots_in_memory,
max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches,
dry_run,
}
}
pub fn clean_all(
chunks_age: DateTime<Utc>,
metadata_age: DateTime<Utc>,
extra_roots: Option<HashSet<SnapshotId>>,
max_snapshots_in_memory: NonZeroU16,
max_compressed_manifest_mem_bytes: NonZeroUsize,
max_concurrent_manifest_fetches: NonZeroU16,
dry_run: bool,
) -> Self {
use Action::DeleteIfCreatedBefore as D;
Self::new(
extra_roots.unwrap_or_default(),
D(chunks_age),
D(metadata_age),
D(metadata_age),
D(metadata_age),
D(metadata_age),
max_snapshots_in_memory,
max_compressed_manifest_mem_bytes,
max_concurrent_manifest_fetches,
dry_run,
)
}
pub fn action_needed(&self) -> bool {
[
&self.dangling_chunks,
&self.dangling_manifests,
&self.dangling_attributes,
&self.dangling_transaction_logs,
&self.dangling_snapshots,
]
.into_iter()
.any(|action| action != &Action::Keep)
}
pub fn deletes_chunks(&self) -> bool {
self.dangling_chunks != Action::Keep
}
pub fn deletes_manifests(&self) -> bool {
self.dangling_manifests != Action::Keep
}
pub fn deletes_attributes(&self) -> bool {
self.dangling_attributes != Action::Keep
}
pub fn deletes_transaction_logs(&self) -> bool {
self.dangling_transaction_logs != Action::Keep
}
pub fn deletes_snapshots(&self) -> bool {
self.dangling_snapshots != Action::Keep
}
fn must_delete_chunk(&self, chunk: &ListInfo<ChunkId>) -> bool {
match self.dangling_chunks {
Action::DeleteIfCreatedBefore(before) => chunk.created_at < before,
_ => false,
}
}
fn must_delete_manifest(&self, manifest: &ListInfo<ManifestId>) -> bool {
match self.dangling_manifests {
Action::DeleteIfCreatedBefore(before) => manifest.created_at < before,
_ => false,
}
}
fn must_delete_snapshot(&self, snapshot: &ListInfo<SnapshotId>) -> bool {
match self.dangling_snapshots {
Action::DeleteIfCreatedBefore(before) => snapshot.created_at < before,
_ => false,
}
}
fn must_delete_transaction_log(&self, tx_log: &ListInfo<SnapshotId>) -> bool {
match self.dangling_transaction_logs {
Action::DeleteIfCreatedBefore(before) => tx_log.created_at < before,
_ => false,
}
}
}
#[derive(Debug, PartialEq, Eq, Default)]
pub struct GCSummary {
pub bytes_deleted: u64,
pub chunks_deleted: u64,
pub manifests_deleted: u64,
pub snapshots_deleted: u64,
pub attributes_deleted: u64,
pub transaction_logs_deleted: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum GCError {
#[error("ref error {0}")]
Ref(#[from] RefError),
#[error("repository error {0}")]
Repository(#[from] RepositoryError),
#[error("format error {0}")]
FormatError(#[from] IcechunkFormatError),
#[error("storage error {0}")]
StorageError(#[from] StorageError),
}
pub type GCResult<A> = Result<A, GCError>;
async fn snapshot_retained(
keep_snapshots: Arc<Mutex<HashSet<SnapshotId>>>,
snap: Arc<Snapshot>,
) -> RepositoryResult<impl TryStream<Ok = ManifestFileInfo, Error = RepositoryError>> {
keep_snapshots
.lock()
.map_err(|_| {
RepositoryErrorKind::Other("can't lock retained snapshots mutex".to_string())
})
.capture()?
.insert(snap.id());
let files: Vec<ManifestFileInfo> = snap.manifest_files().try_collect().inject()?;
Ok(stream::iter(files.into_iter().map(Ok)))
}
async fn manifest_retained(
keep_manifests: Arc<Mutex<HashSet<ManifestId>>>,
asset_manager: Arc<AssetManager>,
minfo: ManifestFileInfo,
) -> RepositoryResult<(Arc<Manifest>, ManifestFileInfo)> {
keep_manifests
.lock()
.map_err(|_| {
RepositoryErrorKind::Other("can't lock retained manifests mutex".to_string())
})
.capture()?
.insert(minfo.id.clone());
let manifest = asset_manager.fetch_manifest(&minfo.id, minfo.size_bytes).await?;
Ok((manifest, minfo))
}
async fn chunks_retained(
keep_chunks: Arc<Mutex<HashSet<ChunkId>>>,
manifest: Arc<Manifest>,
minfo: ManifestFileInfo,
) -> RepositoryResult<ManifestFileInfo> {
task::spawn_blocking(move || {
let chunk_ids =
manifest.chunk_payloads().inject()?.filter_map(|payload| match payload {
Ok(ChunkPayload::Ref(chunk_ref)) => Some(chunk_ref.id.clone()),
Ok(_) => None,
Err(err) => {
tracing::error!(
error = %err,
"Error in chunk payload iterator"
);
None
}
});
keep_chunks
.lock()
.map_err(|_| {
RepositoryErrorKind::Other("can't lock retained chunks mutex".to_string())
})
.capture()?
.extend(chunk_ids);
Ok::<_, RepositoryError>(())
})
.await
.capture()??;
Ok(minfo)
}
#[instrument(skip_all)]
pub async fn find_retained(
asset_manager: Arc<AssetManager>,
config: &GCConfig,
snaps: impl Stream<Item = RepositoryResult<Arc<Snapshot>>>,
) -> GCResult<(HashSet<ChunkId>, HashSet<ManifestId>, HashSet<SnapshotId>)> {
let keep_chunks = Arc::new(Mutex::new(HashSet::new()));
let keep_manifests = Arc::new(Mutex::new(HashSet::new()));
let keep_snapshots = Arc::new(Mutex::new(HashSet::new()));
let all_manifest_infos = snaps
.map(ready)
.buffer_unordered(config.max_snapshots_in_memory.get() as usize)
.and_then(|snap| snapshot_retained(Arc::clone(&keep_snapshots), snap))
.try_flatten();
let manifest_infos = try_unique_stream(|mi| mi.id.clone(), all_manifest_infos);
let limiter = &Arc::new(StreamLimiter::new(
"garbage_collect".to_string(),
config.max_compressed_manifest_mem_bytes.get(),
));
let keep_chunks_ref = &keep_chunks;
let compute_stream = limiter
.limit_stream(manifest_infos, |minfo| minfo.size_bytes as usize)
.map_ok(|m| {
manifest_retained(Arc::clone(&keep_manifests), Arc::clone(&asset_manager), m)
})
.try_buffer_unordered(config.max_concurrent_manifest_fetches.get() as usize)
.and_then(move |(manifest, minfo)| {
chunks_retained(Arc::clone(keep_chunks_ref), manifest, minfo)
});
limiter
.unlimit_stream(compute_stream, |minfo| minfo.size_bytes as usize)
.try_for_each(|_| ready(Ok(())))
.await?;
debug_assert_eq!(limiter.current_usage(), 0);
#[expect(clippy::expect_used)]
Ok((
Arc::try_unwrap(keep_chunks)
.expect("Logic error: multiple owners to retained chunks")
.into_inner()
.expect("Logic error: multiple owners to retained chunks"),
Arc::try_unwrap(keep_manifests)
.expect("Logic error: multiple owners to retained manifests")
.into_inner()
.expect("Logic error: multiple owners to retained manifests"),
Arc::try_unwrap(keep_snapshots)
.expect("Logic error: multiple owners to retained chunks")
.into_inner()
.expect("Logic error: multiple owners to retained chunks"),
))
}
pub async fn garbage_collect(
asset_manager: Arc<AssetManager>,
config: &GCConfig,
repo_update_retries: Option<&RepoUpdateRetryConfig>,
num_updates_per_repo_info_file: u16,
) -> GCResult<GCSummary> {
if !asset_manager.can_write_to_storage().await? {
return Err(RepositoryErrorKind::ReadonlyStorage(
"Cannot garbage collect".to_string(),
))
.capture()
.map_err(GCError::Repository)?;
}
if asset_manager.spec_version() >= SpecVersionBin::V2 {
let (repo_info, _) = asset_manager.fetch_repo_info().await?;
if repo_info.status()?.availability != RepoAvailability::Online {
return Err(RepositoryErrorKind::ReadonlyRepository(
"Cannot garbage collect".to_string(),
))
.capture()
.map_err(GCError::Repository)?;
}
}
let default_retry_config = RepoUpdateRetryConfig::default();
let retry_config = repo_update_retries.unwrap_or(&default_retry_config).retries();
let gc = async || {
garbage_collect_one_attempt(
Arc::clone(&asset_manager),
config,
num_updates_per_repo_info_file,
)
.await
};
let backoff = ExponentialBuilder::new()
.with_min_delay(std::time::Duration::from_millis(
retry_config.initial_backoff_ms() as u64,
))
.with_max_delay(std::time::Duration::from_millis(
retry_config.max_backoff_ms() as u64
))
.with_max_times(retry_config.max_tries().get() as usize)
.with_jitter()
.build();
gc.retry(backoff)
.sleep(tokio::time::sleep)
.when(|e| {
matches!(
e,
GCError::Repository(RepositoryError {
kind: RepositoryErrorKind::RepoInfoUpdated,
..
})
)
})
.notify(|_, _| {
info!(
"Repo info object was updated while GC was running, retrying with backoff..."
);}
)
.await
}
async fn garbage_collect_one_attempt(
asset_manager: Arc<AssetManager>,
config: &GCConfig,
num_updates_per_repo_info_file: u16,
) -> GCResult<GCSummary> {
if !config.action_needed() {
info!("No action requested");
return Ok(GCSummary::default());
}
info!("Finding GC roots");
let snap_deadline =
if let Action::DeleteIfCreatedBefore(date_time) = config.dangling_snapshots {
date_time
} else {
DateTime::<Utc>::MIN_UTC
};
let mut non_pointed_but_new = HashSet::new();
let mut all_snaps = HashSet::new();
let repo_info = if asset_manager.spec_version() > SpecVersionBin::V1 {
let (ri, _) = asset_manager.fetch_repo_info().await?;
non_pointed_but_new = ri
.all_snapshots()?
.filter_map_ok(|si| {
all_snaps.insert(si.id.clone());
if si.flushed_at >= snap_deadline { Some(si.id) } else { None }
})
.try_collect()?;
Some(ri)
} else {
None
};
let pointed_snaps =
pointed_snapshots(Arc::clone(&asset_manager), &config.extra_roots).await?;
let am = Arc::clone(&asset_manager);
let non_pointed_snaps = stream::iter(non_pointed_but_new.into_iter().map(Ok))
.and_then(move |id| {
let am = Arc::clone(&am);
async move { am.fetch_snapshot(&id).await }
});
let (keep_chunks, keep_manifests, mut keep_snapshots) = find_retained(
Arc::clone(&asset_manager),
config,
pointed_snaps.chain(non_pointed_snaps),
)
.await?;
info!(
snapshots = keep_snapshots.len(),
manifests = keep_manifests.len(),
chunks = keep_chunks.len(),
"Retained objects collected"
);
let mut summary = GCSummary::default();
info!("Starting deletes");
let drop_snapshots = all_snaps.difference(&keep_snapshots).cloned().collect();
if config.deletes_snapshots() {
if !config.dry_run && repo_info.is_some() {
delete_snapshots_from_repo_info(
asset_manager.as_ref(),
&mut keep_snapshots,
&drop_snapshots,
num_updates_per_repo_info_file,
)
.await?;
}
debug!("Garbage collecting snapshots");
let res = gc_snapshots(asset_manager.as_ref(), config, &keep_snapshots).await?;
summary.snapshots_deleted = res.deleted_objects;
summary.bytes_deleted += res.deleted_bytes;
}
drop(drop_snapshots);
drop(all_snaps);
if config.deletes_transaction_logs() {
let res =
gc_transaction_logs(asset_manager.as_ref(), config, &keep_snapshots).await?;
summary.transaction_logs_deleted = res.deleted_objects;
summary.bytes_deleted += res.deleted_bytes;
}
if config.deletes_manifests() {
let res = gc_manifests(asset_manager.as_ref(), config, &keep_manifests).await?;
summary.manifests_deleted = res.deleted_objects;
summary.bytes_deleted += res.deleted_bytes;
}
if config.deletes_chunks() {
asset_manager.clear_chunk_cache();
let res = gc_chunks(asset_manager.as_ref(), config, &keep_chunks).await?;
summary.chunks_deleted = res.deleted_objects;
summary.bytes_deleted += res.deleted_bytes;
}
Ok(summary)
}
async fn delete_snapshots_from_repo_info(
asset_manager: &AssetManager,
keep_snapshots: &mut HashSet<SnapshotId>,
drop_snapshots: &HashSet<SnapshotId>,
num_updates_per_repo_info_file: u16,
) -> GCResult<()> {
trace!("deleting snapshots from repo info");
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, _| {
let mut final_snaps = HashSet::with_capacity(2 * keep_snapshots.len());
for si in repo_info.all_snapshots().inject()? {
let si = si.inject()?;
#[expect(clippy::panic)]
match (keep_snapshots.contains(&si.id), drop_snapshots.contains(&si.id)) {
(true, false) => {
if let Some(parent) = &si.parent_id
&& drop_snapshots.contains(parent)
{
final_snaps.insert(SnapshotInfo {
parent_id: Some(Snapshot::INITIAL_SNAPSHOT_ID),
..si
});
} else {
final_snaps.insert(si);
}
}
(false, true) => {
}
(false, false) => {
if let Some(parent) = &si.parent_id
&& drop_snapshots.contains(parent)
{
return Err(RepositoryError::capture(
RepositoryErrorKind::RepoInfoUpdated,
));
} else {
keep_snapshots.insert(si.id.clone());
final_snaps.insert(si);
}
}
(true, true) => {
panic!("Logic error, snapshot must be both retained and deleted")
}
}
}
let final_snap_ids: HashSet<_> = final_snaps.iter().map(|si| &si.id).collect();
for (_, pointed_snap) in
repo_info.tags().inject()?.chain(repo_info.branches().inject()?)
{
if !final_snap_ids.contains(&pointed_snap) {
return Err(RepositoryError::capture(
RepositoryErrorKind::RepoInfoUpdated,
));
}
}
let config_bytes = repo_info.config_bytes_raw().inject()?;
let new_repo_info = RepoInfo::new(
asset_manager.spec_version(),
repo_info.tags().inject()?,
repo_info.branches().inject()?,
repo_info.deleted_tags().inject()?,
final_snaps,
&repo_info.metadata().inject()?,
UpdateInfo {
update_type: UpdateType::GCRanUpdate,
update_time: Utc::now(),
previous_updates: repo_info.latest_updates().inject()?,
},
Some(backup_path),
num_updates_per_repo_info_file,
repo_info.repo_before_updates().inject()?,
config_bytes.as_deref(),
repo_info.enabled_feature_flags().inject()?,
repo_info.disabled_feature_flags().inject()?,
&repo_info.status().inject()?,
)
.inject()?;
Ok(Arc::new(new_repo_info))
};
let retry_settings = storage::RetriesSettings {
max_tries: Some(NonZeroU16::MIN),
..Default::default()
};
let _ = asset_manager.update_repo_info(&retry_settings, do_update).await?;
Ok(())
}
async fn fake_delete_result<const SIZE: usize, T: FileTypeTag>(
to_delete: impl Stream<Item = (ObjectId<SIZE, T>, u64)>,
) -> DeleteObjectsResult {
to_delete
.fold(DeleteObjectsResult::default(), |mut res, (_, size)| {
res.deleted_objects += 1;
res.deleted_bytes += size;
ready(res)
})
.await
}
#[instrument(skip(asset_manager, config, keep_ids), fields(keep_ids.len = keep_ids.len()))]
pub async fn gc_chunks(
asset_manager: &AssetManager,
config: &GCConfig,
keep_ids: &HashSet<ChunkId>,
) -> GCResult<DeleteObjectsResult> {
info!("Deleting chunks");
let to_delete = asset_manager
.list_chunks()
.await?
.inspect_err(|e| error!("Deleting chunks: {e}"))
.filter_map(move |chunk| {
ready(chunk.ok().and_then(|chunk| {
if config.must_delete_chunk(&chunk) && !keep_ids.contains(&chunk.id) {
Some((chunk.id.clone(), chunk.size_bytes))
} else {
None
}
}))
})
.boxed();
if config.dry_run {
Ok(fake_delete_result(to_delete).await)
} else {
Ok(asset_manager.delete_chunks(to_delete).await?)
}
}
#[instrument(skip(asset_manager, config, keep_ids), fields(keep_ids.len = keep_ids.len()))]
pub async fn gc_manifests(
asset_manager: &AssetManager,
config: &GCConfig,
keep_ids: &HashSet<ManifestId>,
) -> GCResult<DeleteObjectsResult> {
info!("Deleting manifests");
let to_delete = asset_manager
.list_manifests()
.await?
.inspect_err(|e| error!("Deleting manifests: {e}"))
.filter_map(move |manifest| {
ready(manifest.ok().and_then(|manifest| {
if config.must_delete_manifest(&manifest)
&& !keep_ids.contains(&manifest.id)
{
asset_manager.remove_cached_manifest(&manifest.id);
Some((manifest.id.clone(), manifest.size_bytes))
} else {
None
}
}))
})
.boxed();
if config.dry_run {
Ok(fake_delete_result(to_delete).await)
} else {
Ok(asset_manager.delete_manifests(to_delete).await?)
}
}
#[instrument(skip(asset_manager, config, keep_ids), fields(keep_ids.len = keep_ids.len()))]
pub async fn gc_snapshots(
asset_manager: &AssetManager,
config: &GCConfig,
keep_ids: &HashSet<SnapshotId>,
) -> GCResult<DeleteObjectsResult> {
info!("Deleting snapshots");
let to_delete = asset_manager
.list_snapshots()
.await?
.inspect_err(|e| error!("Deleting snapshots: {e}"))
.filter_map(move |snapshot| {
ready(snapshot.ok().and_then(|snapshot| {
if config.must_delete_snapshot(&snapshot)
&& !keep_ids.contains(&snapshot.id)
{
asset_manager.remove_cached_snapshot(&snapshot.id);
Some((snapshot.id.clone(), snapshot.size_bytes))
} else {
None
}
}))
})
.boxed();
if config.dry_run {
Ok(fake_delete_result(to_delete).await)
} else {
Ok(asset_manager.delete_snapshots(to_delete).await?)
}
}
#[instrument(skip(asset_manager, config, keep_ids), fields(keep_ids.len = keep_ids.len()))]
pub async fn gc_transaction_logs(
asset_manager: &AssetManager,
config: &GCConfig,
keep_ids: &HashSet<SnapshotId>,
) -> GCResult<DeleteObjectsResult> {
info!("Deleting transaction logs");
let to_delete = asset_manager
.list_transaction_logs()
.await?
.inspect_err(|e| error!("Deleting transaction logs: {e}"))
.filter_map(move |tx| {
ready(tx.ok().and_then(|tx| {
if config.must_delete_transaction_log(&tx) && !keep_ids.contains(&tx.id) {
asset_manager.remove_cached_tx_log(&tx.id);
Some((tx.id.clone(), tx.size_bytes))
} else {
None
}
}))
})
.boxed();
if config.dry_run {
Ok(fake_delete_result(to_delete).await)
} else {
Ok(asset_manager.delete_transaction_logs(to_delete).await?)
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ExpiredRefAction {
Delete,
Ignore,
}
#[derive(Debug, PartialEq, Eq, Clone, Default)]
pub struct ExpireResult {
pub released_snapshots: HashSet<SnapshotId>,
pub edited_snapshots: HashSet<SnapshotId>,
pub deleted_refs: HashSet<Ref>,
}
#[instrument(skip(asset_manager))]
pub async fn expire(
asset_manager: Arc<AssetManager>,
older_than: DateTime<Utc>,
expired_branches: ExpiredRefAction,
expired_tags: ExpiredRefAction,
repo_update_retries: Option<&RepoUpdateRetryConfig>,
num_updates_per_repo_info_file: u16,
) -> GCResult<ExpireResult> {
if !asset_manager.can_write_to_storage().await? {
return Err(RepositoryErrorKind::ReadonlyStorage("Cannot expire".to_string()))
.capture()
.map_err(GCError::Repository)?;
}
if asset_manager.spec_version() >= SpecVersionBin::V2 {
let (repo_info, _) = asset_manager.fetch_repo_info().await?;
if repo_info.status()?.availability != RepoAvailability::Online {
return Err(RepositoryErrorKind::ReadonlyRepository(
"Cannot garbage collect".to_string(),
))
.capture()
.map_err(GCError::Repository)?;
}
}
match asset_manager.spec_version() {
SpecVersionBin::V1 => {
super::expiration_v1::expire(
asset_manager,
older_than,
expired_branches,
expired_tags,
)
.await
}
SpecVersionBin::V2 => {
expire_v2(
asset_manager,
older_than,
expired_branches,
expired_tags,
repo_update_retries,
num_updates_per_repo_info_file,
)
.await
}
}
}
#[instrument(skip(asset_manager))]
pub async fn expire_v2(
asset_manager: Arc<AssetManager>,
older_than: DateTime<Utc>,
expired_branches: ExpiredRefAction,
expired_tags: ExpiredRefAction,
repo_update_retries: Option<&RepoUpdateRetryConfig>,
num_updates_per_repo_info_file: u16,
) -> GCResult<ExpireResult> {
let default_retry_config = RepoUpdateRetryConfig::default();
let retry_config = repo_update_retries.unwrap_or(&default_retry_config).retries();
let backoff = ExponentialBuilder::new()
.with_min_delay(std::time::Duration::from_millis(
retry_config.initial_backoff_ms() as u64,
))
.with_max_delay(std::time::Duration::from_millis(
retry_config.max_backoff_ms() as u64
))
.with_max_times(retry_config.max_tries().get() as usize)
.with_jitter()
.build();
let expire = async || {
expire_v2_one_attempt(
Arc::clone(&asset_manager),
older_than,
expired_branches,
expired_tags,
num_updates_per_repo_info_file,
)
.await
};
expire.retry(backoff)
.sleep(tokio::time::sleep)
.when(|e| {
matches!(
e,
GCError::Repository(RepositoryError {
kind: RepositoryErrorKind::RepoInfoUpdated,
..
})
)
})
.notify(|_, _| {
info!(
"Repo info object was updated while expire was running, retrying with backoff..."
);}
)
.await
}
#[instrument(skip(asset_manager))]
async fn expire_v2_one_attempt(
asset_manager: Arc<AssetManager>,
older_than: DateTime<Utc>,
expired_branches: ExpiredRefAction,
expired_tags: ExpiredRefAction,
num_updates_per_repo_info_file: u16,
) -> GCResult<ExpireResult> {
info!("Expiration started");
let (repo_info, repo_info_version_at_start) = asset_manager.fetch_repo_info().await?;
let tags: Vec<(Ref, SnapshotId)> = repo_info
.tags()?
.map(|(name, snap)| Ok::<_, GCError>((Ref::Tag(name.to_string()), snap)))
.try_collect()?;
let branches: Vec<(Ref, SnapshotId)> = repo_info
.branches()?
.map(|(name, snap)| Ok::<_, GCError>((Ref::Branch(name.to_string()), snap)))
.try_collect()?;
fn split_root<E>(
mut iter: impl Iterator<Item = Result<SnapshotInfo, E>>,
) -> Result<(HashSet<SnapshotId>, Option<SnapshotId>), E> {
iter.try_fold((HashSet::new(), None), |(mut all, root), snap| match snap {
Ok(snap) if snap.parent_id.is_some() => {
all.insert(snap.id);
Ok((all, root))
}
Ok(snap) => Ok((all, Some(snap.id))),
Err(err) => Err(err),
})
}
debug!("Finding roots");
let mut all_tips = tags.iter().chain(branches.iter());
let root_to_snaps = all_tips.try_fold(
HashMap::new(),
|mut res: HashMap<SnapshotId, HashSet<SnapshotId>>, (_, tip_snap)| {
let ancestry = repo_info.ancestry(tip_snap)?;
let (branch_snaps, root) = split_root(ancestry)?;
let root = root.unwrap_or(Snapshot::INITIAL_SNAPSHOT_ID);
match res.get_mut(&root) {
Some(s) => {
s.extend(branch_snaps);
}
None => {
res.insert(root, branch_snaps);
}
};
Ok::<_, GCError>(res)
},
)?;
let new_parent = move |id: &SnapshotId| {
for (new_parent, all) in root_to_snaps.iter() {
if all.contains(id) {
return Some(new_parent.clone());
}
}
None
};
debug!("Finding ref tips");
let tag_tip_ids: HashSet<SnapshotId> = repo_info.tags()?.map(|(_, id)| id).collect();
let branch_tip_ids: HashSet<SnapshotId> =
repo_info.branches()?.map(|(_, id)| id).collect();
let main_pointee = repo_info.resolve_branch(Ref::DEFAULT_BRANCH)?;
let expired_snapshot_infos: Vec<SnapshotInfo> = repo_info
.all_snapshots()?
.filter_map(|si| match si {
Ok(si) if si.flushed_at < older_than && si.parent_id.is_some() => {
Some(Ok(si))
}
Ok(_) => None,
Err(e) => Some(Err(e)),
})
.try_collect()?;
debug!("Calculating released snapshots");
let released_snapshots: HashSet<SnapshotId> = expired_snapshot_infos
.iter()
.filter_map(|si| {
if si.flushed_at < older_than && si.parent_id.is_some() {
use ExpiredRefAction::*;
if expired_tags == Ignore && tag_tip_ids.contains(&si.id)
|| (expired_branches == Ignore || si.id == main_pointee)
&& branch_tip_ids.contains(&si.id)
{
None
} else {
Some(si.id.clone())
}
} else {
None
}
})
.collect();
let expired_snapshots: HashSet<SnapshotId> =
expired_snapshot_infos.into_iter().map(|x| x.id).collect();
let num_released_snapshots = released_snapshots.len();
debug!("Calculating retained snapshots");
let mut edited_snapshots = HashSet::new();
let retained: Vec<_> = repo_info
.all_snapshots()?
.filter_map(|si| match si {
Ok(si) if released_snapshots.contains(&si.id) => None,
Ok(si) => match si.parent_id.as_ref() {
Some(parent_id) => {
if released_snapshots.contains(parent_id) {
edited_snapshots.insert(si.id.clone());
Some(Ok(SnapshotInfo { parent_id: new_parent(&si.id), ..si }))
} else {
Some(Ok(si))
}
}
None => Some(Ok(si)),
},
Err(e) => Some(Err(e)),
})
.try_collect()?;
debug!("Calculating deleted refs");
let mut deleted_tags: HashSet<_> = tags
.into_iter()
.filter_map(|(r, snap_id)| {
if expired_tags == ExpiredRefAction::Delete
&& expired_snapshots.contains(&snap_id)
{
Some(r)
} else {
None
}
})
.collect();
let deleted_branches: HashSet<_> = branches
.into_iter()
.filter_map(|(r, snap_id)| {
if expired_branches == ExpiredRefAction::Delete
&& r.name() != Ref::DEFAULT_BRANCH
&& expired_snapshots.contains(&snap_id)
{
Some(r)
} else {
None
}
})
.collect();
info!(
snapshots = num_released_snapshots,
branches = deleted_branches.iter().map(|r| r.name()).join("/"),
tags = deleted_tags.iter().map(|r| r.name()).join("/"),
"Releasing objects"
);
let do_update = |repo_info: Arc<RepoInfo>, backup_path: &str, version| {
if version != repo_info_version_at_start {
return Err(RepositoryError::capture(RepositoryErrorKind::RepoInfoUpdated));
}
let tags = repo_info
.tags()
.inject()?
.filter(|(name, _)| !deleted_tags.contains(&Ref::Tag(name.to_string())));
let branches = repo_info.branches().inject()?.filter(|(name, _)| {
!deleted_branches.contains(&Ref::Branch(name.to_string()))
});
let deleted_tag_names = repo_info.deleted_tags().inject()?.chain(
deleted_tags.iter().filter_map(|r| match r {
Ref::Tag(name) => Some(name.as_str()),
Ref::Branch(_) => None,
}),
);
let config_bytes = repo_info.config_bytes_raw().inject()?;
let new_repo_info = RepoInfo::new(
asset_manager.spec_version(),
tags,
branches,
deleted_tag_names,
retained.clone(),
&repo_info.metadata().inject()?,
UpdateInfo {
update_type: UpdateType::ExpirationRanUpdate,
update_time: Utc::now(),
previous_updates: repo_info.latest_updates().inject()?,
},
Some(backup_path),
num_updates_per_repo_info_file,
repo_info.repo_before_updates().inject()?,
config_bytes.as_deref(),
repo_info.enabled_feature_flags().inject()?,
repo_info.disabled_feature_flags().inject()?,
&repo_info.status().inject()?,
)
.inject()?;
Ok(Arc::new(new_repo_info))
};
let retry_settings = storage::RetriesSettings {
max_tries: Some(NonZeroU16::MIN),
..Default::default()
};
let _ = asset_manager.update_repo_info(&retry_settings, do_update).await?;
deleted_tags.extend(deleted_branches);
debug!("Expiration done");
Ok(ExpireResult { released_snapshots, edited_snapshots, deleted_refs: deleted_tags })
}