use std::{
collections::HashMap,
num::{NonZeroU16, NonZeroUsize},
panic::AssertUnwindSafe,
sync::Arc,
};
use bytes::Bytes;
use chrono::Utc;
use futures::FutureExt as _;
use icechunk::{
Repository, RepositoryConfig, Storage,
config::{S3Credentials, S3Options, S3StaticCredentials},
format::{
ByteRange, ChunkIndices, Path, format_constants::SpecVersionBin,
snapshot::ArrayShape,
},
ops::gc::{GCConfig, garbage_collect},
repository::{RepositoryError, RepositoryErrorKind, VersionInfo},
session::get_chunk,
storage::{Settings, mk_client, s3_storage},
};
use icechunk_macros::tokio_test;
use crate::common;
const ENDPOINT: &str = "http://localhost:4200";
fn rustfs_options() -> S3Options {
S3Options::default()
.with_region("us-east-1")
.with_endpoint_url(ENDPOINT)
.with_allow_http(true)
.with_force_path_style(true)
}
fn static_credentials(access_key_id: &str, secret_access_key: &str) -> S3Credentials {
S3Credentials::Static(S3StaticCredentials {
access_key_id: access_key_id.to_string(),
secret_access_key: secret_access_key.to_string(),
session_token: None,
expires_after: None,
})
}
fn root_credentials() -> S3Credentials {
static_credentials("test123", "test123")
}
fn root_storage(
bucket: &str,
prefix: Option<&str>,
legacy_rooted_keys: bool,
) -> Arc<dyn Storage + Send + Sync> {
Arc::new(
s3_storage(
rustfs_options(),
bucket.to_string(),
prefix.map(str::to_string),
Some(root_credentials()),
legacy_rooted_keys.then_some(true),
)
.unwrap()
.unsafe_allow_empty_prefix_creation(),
)
}
const MINIO_ENDPOINT: &str = "http://localhost:4202";
fn minio_options() -> S3Options {
S3Options::default()
.with_region("us-east-1")
.with_endpoint_url(MINIO_ENDPOINT)
.with_allow_http(true)
.with_force_path_style(true)
}
fn minio_credentials() -> S3Credentials {
static_credentials("minioadmin", "minioadmin")
}
fn minio_storage(bucket: &str, prefix: Option<&str>) -> Arc<dyn Storage + Send + Sync> {
Arc::new(
s3_storage(
minio_options(),
bucket.to_string(),
prefix.map(str::to_string),
Some(minio_credentials()),
None,
)
.unwrap()
.unsafe_allow_empty_prefix_creation(),
)
}
async fn fresh_bucket() -> String {
create_fresh_bucket(&rustfs_options(), root_credentials()).await
}
async fn fresh_minio_bucket() -> String {
create_fresh_bucket(&minio_options(), minio_credentials()).await
}
async fn create_fresh_bucket(options: &S3Options, credentials: S3Credentials) -> String {
let bucket = format!(
"testbucket-layout-{:016}-{:016x}",
Utc::now().timestamp_micros(),
rand::random::<u64>(),
);
let client =
mk_client(options, credentials, vec![], vec![], &Settings::default()).await;
client.create_bucket().bucket(&bucket).send().await.expect("create_bucket");
bucket
}
async fn raw_keys(bucket: &str) -> Vec<String> {
let client = mk_client(
&rustfs_options(),
root_credentials(),
vec![],
vec![],
&Settings::default(),
)
.await;
let resp =
client.list_objects_v2().bucket(bucket).send().await.expect("list_objects_v2");
resp.contents().iter().filter_map(|o| o.key().map(str::to_string)).collect()
}
async fn create_repo_with_one_chunk(
storage: Arc<dyn Storage + Send + Sync>,
spec_version: SpecVersionBin,
value: i8,
) -> Result<Repository, Box<dyn std::error::Error>> {
let repo = Repository::create(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(0),
..Default::default()
}),
storage,
HashMap::new(),
Some(spec_version),
true,
)
.await?;
write_one_chunk(&repo, value).await?;
Ok(repo)
}
fn version_anchor(spec_version: SpecVersionBin) -> &'static str {
match spec_version {
SpecVersionBin::V2 => "repo",
SpecVersionBin::V1 => "refs/branch.main/ref.json",
}
}
async fn write_one_chunk(
repo: &Repository,
value: i8,
) -> Result<(), Box<dyn std::error::Error>> {
let mut ds = repo.writable_session("main").await?;
let array_path: Path = "/array".try_into().unwrap();
if ds.get_node(&array_path).await.is_err() {
ds.add_group(Path::root(), Bytes::new()).await?;
let shape = ArrayShape::new(vec![(2, 1)]).unwrap();
ds.add_array(array_path.clone(), shape, None, Bytes::new()).await?;
}
let payload =
ds.get_chunk_writer()?(Bytes::copy_from_slice(&value.to_be_bytes())).await?;
ds.set_chunk_ref(array_path.clone(), ChunkIndices(vec![0]), Some(payload)).await?;
ds.commit(format!("write {value}")).execute().await?;
Ok(())
}
async fn read_chunk0(repo: &Repository) -> Result<i8, Box<dyn std::error::Error>> {
let array_path: Path = "/array".try_into().unwrap();
let ds =
repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())).await?;
let bytes = get_chunk(
ds.get_chunk_reader(&array_path, &ChunkIndices(vec![0]), &ByteRange::ALL).await?,
)
.await?
.unwrap();
Ok(i8::from_be_bytes([bytes[0]]))
}
#[tokio_test]
async fn empty_prefix_writes_clean_keys() -> Result<(), Box<dyn std::error::Error>> {
for spec_version in [SpecVersionBin::V1, SpecVersionBin::V2] {
let bucket = fresh_bucket().await;
let storage = root_storage(&bucket, Some(""), false);
create_repo_with_one_chunk(Arc::clone(&storage), spec_version, 42).await?;
let keys = raw_keys(&bucket).await;
assert!(!keys.is_empty(), "repo wrote no objects");
for key in &keys {
assert!(
!key.starts_with('/'),
"key {key:?} starts with a slash (the #2239 bug)"
);
}
let anchor = version_anchor(spec_version);
assert!(
keys.iter().any(|k| k == anchor),
"expected a clean `{anchor}` key, got {keys:?}"
);
assert!(
keys.iter().any(|k| k.starts_with("chunks/")),
"expected a clean `chunks/...` key, got {keys:?}"
);
}
Ok(())
}
#[tokio_test]
async fn empty_prefix_roundtrips() -> Result<(), Box<dyn std::error::Error>> {
for spec_version in [SpecVersionBin::V1, SpecVersionBin::V2] {
let bucket = fresh_bucket().await;
create_repo_with_one_chunk(
root_storage(&bucket, Some(""), false),
spec_version,
7,
)
.await?;
let repo =
Repository::open(None, root_storage(&bucket, None, false), HashMap::new())
.await?;
assert_eq!(read_chunk0(&repo).await?, 7);
}
Ok(())
}
#[tokio_test]
async fn empty_prefix_roundtrips_on_normalizing_store()
-> Result<(), Box<dyn std::error::Error>> {
let bucket = fresh_minio_bucket().await;
create_repo_with_one_chunk(minio_storage(&bucket, Some("")), SpecVersionBin::V2, 13)
.await?;
let repo =
Repository::open(None, minio_storage(&bucket, None), HashMap::new()).await?;
assert_eq!(read_chunk0(&repo).await?, 13);
Ok(())
}
#[tokio_test]
async fn empty_prefix_create_refuses_over_existing_repo()
-> Result<(), Box<dyn std::error::Error>> {
use SpecVersionBin::{V1, V2};
for (existing, new) in [(V1, V1), (V1, V2), (V2, V1), (V2, V2)] {
let bucket = fresh_bucket().await;
create_repo_with_one_chunk(root_storage(&bucket, Some(""), false), existing, 1)
.await?;
let err = Repository::create(
None,
root_storage(&bucket, Some(""), false),
HashMap::new(),
Some(new),
true,
)
.await
.unwrap_err();
assert!(
matches!(
err,
RepositoryError {
kind: RepositoryErrorKind::ParentDirectoryNotClean,
..
}
),
"expected ParentDirectoryNotClean for {existing:?} -> {new:?}, got {err:?}"
);
}
Ok(())
}
#[tokio_test]
async fn empty_prefix_nonexistent_repo() -> Result<(), Box<dyn std::error::Error>> {
let bucket = fresh_bucket().await;
let err =
Repository::open(None, root_storage(&bucket, Some(""), false), HashMap::new())
.await
.unwrap_err();
assert!(
matches!(
err,
RepositoryError { kind: RepositoryErrorKind::RepositoryDoesntExist, .. }
),
"expected RepositoryDoesntExist, got {err:?}"
);
Ok(())
}
#[tokio_test]
async fn empty_prefix_gc_actually_deletes_chunks()
-> Result<(), Box<dyn std::error::Error>> {
let bucket = fresh_bucket().await;
let storage = root_storage(&bucket, Some(""), false);
let repo =
create_repo_with_one_chunk(Arc::clone(&storage), SpecVersionBin::V2, 42).await?;
let first = repo.lookup_branch("main").await?;
write_one_chunk(&repo, 7).await?;
let chunks_before =
raw_keys(&bucket).await.iter().filter(|k| k.starts_with("chunks/")).count();
assert_eq!(chunks_before, 2, "expected two distinct chunk objects");
repo.reset_branch("main", &first, None).await?;
let now = Utc::now();
let gc_config = GCConfig::clean_all(
now,
now,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
let summary =
garbage_collect(Arc::clone(repo.asset_manager()), &gc_config, None, 100).await?;
assert_eq!(summary.chunks_deleted, 1, "GC should report one deleted chunk");
let chunks_after =
raw_keys(&bucket).await.iter().filter(|k| k.starts_with("chunks/")).count();
assert_eq!(
chunks_after, 1,
"the deleted chunk object must actually be gone from the bucket (orphan-bug regression)"
);
Ok(())
}
async fn do_rooted_roundtrip(
store: common::RealStore,
) -> Result<(), Box<dyn std::error::Error>> {
store.cleanup_rooted_keys().await?;
let result = AssertUnwindSafe(rooted_roundtrip_body(&store)).catch_unwind().await;
let cleanup = store.cleanup_rooted_keys().await;
match result {
Err(panic) => std::panic::resume_unwind(panic),
Ok(Err(e)) => Err(e),
Ok(Ok(())) => cleanup,
}
}
async fn rooted_roundtrip_body(
store: &common::RealStore,
) -> Result<(), Box<dyn std::error::Error>> {
let repo = Repository::create(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(0),
..Default::default()
}),
store.rooted_storage(true)?,
HashMap::new(),
Some(SpecVersionBin::V2),
false,
)
.await?;
write_one_chunk(&repo, 42).await?;
let repo =
Repository::open(None, store.rooted_storage(false)?, HashMap::new()).await?;
assert_eq!(read_chunk0(&repo).await?, 42);
write_one_chunk(&repo, 7).await?;
let repo =
Repository::open(None, store.rooted_storage(false)?, HashMap::new()).await?;
assert_eq!(read_chunk0(&repo).await?, 7);
let now = Utc::now();
let gc_config = GCConfig::clean_all(
now,
now,
None,
NonZeroU16::new(50).unwrap(),
NonZeroUsize::new(512 * 1024 * 1024).unwrap(),
NonZeroU16::new(500).unwrap(),
false,
);
garbage_collect(Arc::clone(repo.asset_manager()), &gc_config, None, 100).await?;
Ok(())
}
#[tokio_test]
#[ignore = "needs credentials from env"]
async fn rooted_roundtrip_in_aws() -> Result<(), Box<dyn std::error::Error>> {
let store = common::aws_real_store().expect("AWS_* env vars must be set");
do_rooted_roundtrip(store).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
async fn rooted_roundtrip_in_r2() -> Result<(), Box<dyn std::error::Error>> {
let store = common::r2_real_store().expect("R2_* env vars must be set");
do_rooted_roundtrip(store).await
}
#[tokio_test]
#[ignore = "needs credentials from env"]
async fn rooted_roundtrip_in_tigris() -> Result<(), Box<dyn std::error::Error>> {
let store = common::tigris_real_store().expect("TIGRIS_* env vars must be set");
do_rooted_roundtrip(store).await
}