use std::{
collections::HashMap,
num::{NonZeroU16, NonZeroUsize},
sync::Arc,
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{StreamExt as _, TryStreamExt as _};
use icechunk::{
Repository, RepositoryConfig, Storage,
asset_manager::AssetManager,
config::{
DEFAULT_MAX_CONCURRENT_REQUESTS, ManifestConfig, ManifestSplitCondition,
ManifestSplitDim, ManifestSplitDimCondition, ManifestSplittingConfig,
},
format::{
ByteRange, ChunkIndices, Path, format_constants::SpecVersionBin,
manifest::ChunkPayload, snapshot::ArrayShape,
},
new_in_memory_storage,
ops::gc::{ExpiredRefAction, GCConfig, GCSummary, expire, garbage_collect},
refs::Ref,
repository::VersionInfo,
session::get_chunk,
};
use icechunk_macros::tokio_test;
use pretty_assertions::assert_eq;
use crate::common;
use crate::common::Permission;
#[tokio_test]
pub async fn test_gc_in_minio_spec_v1() -> Result<(), Box<dyn std::error::Error>> {
let prefix = format!("test_gc_v1_{}", Utc::now().timestamp_millis());
let storage = common::make_minio_integration_storage(prefix, &Permission::Modify)?;
do_test_gc(storage, Some(SpecVersionBin::V1)).await
}
#[tokio_test]
pub async fn test_gc_in_minio_spec_v2() -> Result<(), Box<dyn std::error::Error>> {
let prefix = format!("test_gc_v2_{}", Utc::now().timestamp_millis());
let storage = common::make_minio_integration_storage(prefix, &Permission::Modify)?;
do_test_gc(storage, Some(SpecVersionBin::V2)).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
pub async fn test_gc_in_aws() -> Result<(), Box<dyn std::error::Error>> {
let prefix = format!("test_gc_{}", Utc::now().timestamp_millis());
let storage = common::make_aws_integration_storage(prefix)?;
do_test_gc(storage, None).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
pub async fn test_gc_in_r2() -> Result<(), Box<dyn std::error::Error>> {
let prefix = format!("test_gc_{}", Utc::now().timestamp_millis());
let storage = common::make_r2_integration_storage(prefix)?;
do_test_gc(storage, None).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
pub async fn test_gc_in_tigris() -> Result<(), Box<dyn std::error::Error>> {
let prefix = format!("test_gc_{}", Utc::now().timestamp_millis());
let storage = common::make_tigris_integration_storage(prefix)?;
do_test_gc(storage, None).await
}
async fn do_test_gc(
storage: Arc<dyn Storage + Send + Sync>,
spec_version: Option<SpecVersionBin>,
) -> Result<(), Box<dyn std::error::Error>> {
let shape = ArrayShape::new(vec![(1100, 1100)]).unwrap();
let manifest_split_size = 10;
let split_sizes = Some(vec![(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: manifest_split_size,
}],
)]);
let man_config = ManifestConfig {
splitting: Some(ManifestSplittingConfig { split_sizes }),
..ManifestConfig::default()
};
let repo = Repository::create(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(0),
manifest: Some(man_config),
..Default::default()
}),
Arc::clone(&storage),
HashMap::new(),
spec_version,
true,
)
.await?;
let mut ds = repo.writable_session("main").await?;
let user_data = Bytes::new();
ds.add_group(Path::root(), user_data.clone()).await?;
let array_path: Path = "/array".try_into().unwrap();
ds.add_array(array_path.clone(), shape, None, user_data.clone()).await?;
for idx in 0..1100 {
let bytes = Bytes::copy_from_slice(&42i8.to_be_bytes());
let payload = ds.get_chunk_writer()?(bytes.clone()).await?;
ds.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
let first_snap_id = ds.commit("first").max_concurrent_nodes(8).execute().await?;
assert_eq!(repo.asset_manager().list_chunks().await?.count().await, 1100);
assert_eq!(repo.asset_manager().list_manifests().await?.count().await, 110);
let mut ds = repo.writable_session("main").await?;
for idx in 0..10 {
let bytes = Bytes::copy_from_slice(&0i8.to_be_bytes());
let payload = ds.get_chunk_writer()?(bytes.clone()).await?;
ds.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
let _second_snap_id = ds.commit("second").max_concurrent_nodes(8).execute().await?;
assert_eq!(repo.asset_manager().list_chunks().await?.count().await, 1110);
assert_eq!(repo.asset_manager().list_manifests().await?.count().await, 111);
let now = Utc::now();
let gc_config = GCConfig::clean_all(
now,
now,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
let summary =
garbage_collect(Arc::clone(repo.asset_manager()), &gc_config, None, 100).await?;
assert_eq!(summary, GCSummary::default());
assert_eq!(repo.asset_manager().list_chunks().await?.count().await, 1110);
for idx in 0..10 {
let bytes = get_chunk(
ds.get_chunk_reader(&array_path, &ChunkIndices(vec![idx]), &ByteRange::ALL)
.await?,
)
.await?
.unwrap();
assert_eq!(&0i8.to_be_bytes(), bytes.as_ref());
}
repo.reset_branch("main", &first_snap_id, None).await?;
assert_eq!(repo.asset_manager().list_chunks().await?.count().await, 1110);
assert_eq!(repo.asset_manager().list_manifests().await?.count().await, 111);
let summary =
garbage_collect(Arc::clone(repo.asset_manager()), &gc_config, None, 100).await?;
assert_eq!(summary.chunks_deleted, 10);
assert_eq!(summary.manifests_deleted, 1);
assert_eq!(summary.snapshots_deleted, 1);
assert!(summary.bytes_deleted > summary.chunks_deleted);
assert_eq!(repo.asset_manager().list_chunks().await?.count().await, 1100);
assert_eq!(repo.asset_manager().list_manifests().await?.count().await, 110);
assert_eq!(repo.asset_manager().list_snapshots().await?.count().await, 2);
let ds =
repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())).await?;
for idx in 0..10 {
let bytes = get_chunk(
ds.get_chunk_reader(&array_path, &ChunkIndices(vec![idx]), &ByteRange::ALL)
.await?,
)
.await?
.unwrap();
assert_eq!(&42i8.to_be_bytes(), bytes.as_ref());
}
let mut anon_snaps = vec![];
for i in 0..5 {
let mut session = repo.writable_session("main").await?;
let bytes = Bytes::copy_from_slice(&(100i8 + i as i8).to_be_bytes());
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![0]), Some(payload))
.await?;
let snap_id = session.commit(format!("anon {i}")).anonymous().execute().await?;
anon_snaps.push(snap_id);
}
let cutoff = repo.lookup_snapshot(&anon_snaps[2]).await?.flushed_at;
let gc_config = GCConfig::clean_all(
cutoff,
cutoff,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
let summary =
garbage_collect(Arc::clone(repo.asset_manager()), &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 2);
for snap_id in &anon_snaps[2..] {
repo.readonly_session(&VersionInfo::SnapshotId(snap_id.clone())).await?;
}
Ok(())
}
async fn branch_commit_messages(repo: &Repository, branch: &str) -> Vec<String> {
repo.ancestry(&VersionInfo::BranchTipRef(branch.to_string()))
.await
.unwrap()
.map_ok(|meta| meta.message)
.try_collect::<Vec<_>>()
.await
.unwrap()
}
async fn tag_commit_messages(repo: &Repository, tag: &str) -> Vec<String> {
repo.ancestry(&VersionInfo::TagRef(tag.to_string()))
.await
.unwrap()
.map_ok(|meta| meta.message)
.try_collect::<Vec<_>>()
.await
.unwrap()
}
async fn make_design_doc_repo(
repo: &mut Repository,
) -> Result<DateTime<Utc>, Box<dyn std::error::Error>> {
let mut session = repo.writable_session("main").await?;
let user_data = Bytes::new();
session.add_group(Path::root(), user_data.clone()).await?;
session.commit("1").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::try_from("/2").unwrap(), user_data.clone()).await?;
let commit_2 = session.commit("2").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::try_from("/4").unwrap(), user_data.clone()).await?;
session.commit("4").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::try_from("/5").unwrap(), user_data.clone()).await?;
let snap_id = session.commit("5").max_concurrent_nodes(8).execute().await?;
repo.create_tag("tag2", &snap_id).await?;
repo.create_branch("develop", &commit_2).await?;
let mut session = repo.writable_session("develop").await?;
session.add_group(Path::try_from("/3").unwrap(), user_data.clone()).await?;
let snap_id = session.commit("3").max_concurrent_nodes(8).execute().await?;
repo.create_tag("tag1", &snap_id).await?;
let mut session = repo.writable_session("develop").await?;
session.add_group(Path::try_from("/6").unwrap(), user_data.clone()).await?;
let commit_6 = session.commit("6").max_concurrent_nodes(8).execute().await?;
repo.create_branch("test", &commit_6).await?;
let mut session = repo.writable_session("test").await?;
session.add_group(Path::try_from("/7").unwrap(), user_data.clone()).await?;
let commit_7 = session.commit("7").max_concurrent_nodes(8).execute().await?;
let expire_older_than = Utc::now();
repo.create_branch("qa", &commit_7).await?;
let mut session = repo.writable_session("qa").await?;
session.add_group(Path::try_from("/8").unwrap(), user_data.clone()).await?;
session.commit("8").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::try_from("/12").unwrap(), user_data.clone()).await?;
session.commit("12").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::try_from("/13").unwrap(), user_data.clone()).await?;
session.commit("13").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("main").await?;
session.add_group(Path::try_from("/14").unwrap(), user_data.clone()).await?;
session.commit("14").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("develop").await?;
session.add_group(Path::try_from("/10").unwrap(), user_data.clone()).await?;
session.commit("10").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("develop").await?;
session.add_group(Path::try_from("/11").unwrap(), user_data.clone()).await?;
session.commit("11").max_concurrent_nodes(8).execute().await?;
let mut session = repo.writable_session("test").await?;
session.add_group(Path::try_from("/9").unwrap(), user_data.clone()).await?;
session.commit("9").max_concurrent_nodes(8).execute().await?;
assert_eq!(
branch_commit_messages(repo, "main").await,
Vec::from(["14", "13", "12", "5", "4", "2", "1", "Repository initialized"])
);
assert_eq!(
branch_commit_messages(repo, "develop").await,
Vec::from(["11", "10", "6", "3", "2", "1", "Repository initialized"])
);
assert_eq!(
branch_commit_messages(repo, "test").await,
Vec::from(["9", "7", "6", "3", "2", "1", "Repository initialized"])
);
assert_eq!(
branch_commit_messages(repo, "qa").await,
Vec::from(["8", "7", "6", "3", "2", "1", "Repository initialized"])
);
Ok(expire_older_than)
}
#[tokio_test]
pub async fn test_expire_and_garbage_collect_in_memory()
-> Result<(), Box<dyn std::error::Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
do_test_expire_and_garbage_collect(storage).await
}
#[tokio_test]
pub async fn test_expire_and_garbage_collect_in_minio()
-> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_expire_and_garbage_collect_{}", Utc::now().timestamp_millis());
let storage: Arc<dyn Storage + Send + Sync> =
common::make_minio_integration_storage(prefix, &Permission::Modify)?;
do_test_expire_and_garbage_collect(storage).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
pub async fn test_expire_and_garbage_collect_in_aws()
-> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_expire_and_garbage_collect_{}", Utc::now().timestamp_millis());
let storage: Arc<dyn Storage + Send + Sync> =
common::make_aws_integration_storage(prefix)?;
do_test_expire_and_garbage_collect(storage).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
pub async fn test_expire_and_garbage_collect_in_r2()
-> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_expire_and_garbage_collect_{}", Utc::now().timestamp_millis());
let storage: Arc<dyn Storage + Send + Sync> =
common::make_r2_integration_storage(prefix)?;
do_test_expire_and_garbage_collect(storage).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
pub async fn test_expire_and_garbage_collect_in_tigris()
-> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_expire_and_garbage_collect_{}", Utc::now().timestamp_millis());
let storage: Arc<dyn Storage + Send + Sync> =
common::make_tigris_integration_storage(prefix)?;
do_test_expire_and_garbage_collect(storage).await
}
async fn do_test_expire_and_garbage_collect(
storage: Arc<dyn Storage + Send + Sync>,
) -> Result<(), Box<dyn std::error::Error>> {
let storage_settings = storage.default_settings().await?;
let mut repo =
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
let expire_older_than = make_design_doc_repo(&mut repo).await?;
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings.clone(),
SpecVersionBin::current(),
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let result = expire(
Arc::clone(&asset_manager),
expire_older_than,
ExpiredRefAction::Ignore,
ExpiredRefAction::Ignore,
None,
100,
)
.await?;
assert_eq!(result.released_snapshots.len(), 5);
assert_eq!(result.deleted_refs.len(), 0);
let repo = Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
assert_eq!(
branch_commit_messages(&repo, "main").await,
Vec::from(["14", "13", "12", "5", "Repository initialized"])
);
assert_eq!(
branch_commit_messages(&repo, "develop").await,
Vec::from(["11", "10", "Repository initialized"])
);
assert_eq!(
branch_commit_messages(&repo, "test").await,
Vec::from(["9", "Repository initialized"])
);
assert_eq!(
branch_commit_messages(&repo, "qa").await,
Vec::from(["8", "Repository initialized"])
);
assert_eq!(
tag_commit_messages(&repo, "tag1").await,
Vec::from(["3", "Repository initialized"])
);
assert_eq!(
tag_commit_messages(&repo, "tag2").await,
Vec::from(["5", "Repository initialized"])
);
let now = Utc::now();
let gc_config = GCConfig::clean_all(
now,
now,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings.clone(),
SpecVersionBin::current(),
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let summary =
garbage_collect(Arc::clone(&asset_manager), &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 5);
assert_eq!(repo.asset_manager().list_snapshots().await?.count().await, 10);
repo.delete_tag("tag1").await?;
let summary =
garbage_collect(Arc::clone(&asset_manager), &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 1);
assert_eq!(repo.asset_manager().list_snapshots().await?.count().await, 9);
repo.delete_tag("tag2").await?;
let summary =
garbage_collect(Arc::clone(&asset_manager), &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 0);
assert_eq!(repo.asset_manager().list_snapshots().await?.count().await, 9);
Ok(())
}
#[tokio_test]
pub async fn test_expire_and_garbage_collect_deleting_expired_refs()
-> Result<(), Box<dyn std::error::Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let storage_settings = storage.default_settings().await?;
let mut repo =
Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
let expire_older_than = make_design_doc_repo(&mut repo).await?;
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings.clone(),
SpecVersionBin::current(),
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let result = expire(
Arc::clone(&asset_manager),
expire_older_than,
ExpiredRefAction::Delete,
ExpiredRefAction::Delete,
None,
100,
)
.await?;
assert_eq!(result.released_snapshots.len(), 7);
assert_eq!(result.deleted_refs.len(), 2);
let now = Utc::now();
let gc_config = GCConfig::clean_all(
now,
now,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
let summary =
garbage_collect(Arc::clone(&asset_manager), &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 7);
assert_eq!(summary.transaction_logs_deleted, 7);
assert_eq!(repo.asset_manager().list_snapshots().await?.count().await, 8);
Ok(())
}
#[tokio::test]
async fn test_gc_reset_branch() -> Result<(), Box<dyn std::error::Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let storage_settings = storage.default_settings().await?;
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings,
SpecVersionBin::current(),
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let repo = Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
let mut session = repo.writable_session("main").await?;
let array_path: Path = "/array".to_string().try_into().unwrap();
let shape = ArrayShape::new(vec![(4, 4)]).unwrap();
let dimension_names = Some(vec!["t".into()]);
let def = Bytes::from_static(br#"{"this":"other array"}"#);
session
.add_array(
array_path.clone(),
shape.clone(),
dimension_names.clone(),
def.clone(),
)
.await?;
session.commit("initialized").max_concurrent_nodes(8).execute().await?;
let mut snaps = vec![];
for i in 0..6 {
let mut session = repo.writable_session("main").await?;
session
.set_chunk_ref(
array_path.clone(),
ChunkIndices(vec![0]),
Some(ChunkPayload::Inline(Bytes::from(format!("{i}")))),
)
.await?;
let snap = session
.commit(format!("commit {i}"))
.max_concurrent_nodes(8)
.execute()
.await?;
snaps.push(snap);
}
repo.reset_branch("main", &snaps[1], None).await?;
let before = repo.lookup_snapshot(&snaps[3]).await?.flushed_at;
let gc_config = GCConfig::clean_all(
before,
before,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
let summary =
garbage_collect(Arc::clone(&asset_manager), &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 1);
repo.create_tag("foo", &snaps[3]).await?;
let _anc =
repo.ancestry(&VersionInfo::TagRef("foo".into())).await?.try_collect::<Vec<_>>();
repo.create_branch("zoo", &snaps[5]).await?;
let _anc = repo
.ancestry(&VersionInfo::BranchTipRef("zoo".into()))
.await?
.try_collect::<Vec<_>>();
let summary = garbage_collect(asset_manager, &gc_config, None, 100).await?;
assert_eq!(summary.snapshots_deleted, 0);
repo.readonly_session(&VersionInfo::SnapshotId(snaps[3].clone())).await?;
repo.readonly_session(&VersionInfo::SnapshotId(snaps[4].clone())).await?;
repo.readonly_session(&VersionInfo::SnapshotId(snaps[5].clone())).await?;
Ok(())
}
#[tokio_test]
pub async fn test_expire_deletes_branch_sharing_tip_with_main()
-> Result<(), Box<dyn std::error::Error>> {
let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
let storage_settings = storage.default_settings().await?;
let repo = Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
.await?;
let mut session = repo.writable_session("main").await?;
let user_data = Bytes::new();
session.add_group(Path::root(), user_data.clone()).await?;
let commit_1 = session.commit("first").execute().await?;
repo.create_branch("feature", &commit_1).await?;
let branches = repo.list_branches().await?;
assert!(branches.contains("main"));
assert!(branches.contains("feature"));
let expire_older_than = Utc::now();
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings.clone(),
SpecVersionBin::current(),
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let result = expire(
Arc::clone(&asset_manager),
expire_older_than,
ExpiredRefAction::Delete,
ExpiredRefAction::Ignore,
None,
100,
)
.await?;
assert!(result.deleted_refs.contains(&Ref::Branch("feature".to_string())));
let repo = Repository::open(None, Arc::clone(&storage), HashMap::new()).await?;
let branches = repo.list_branches().await?;
assert!(branches.contains("main"));
assert!(!branches.contains("feature"));
Ok(())
}