use std::{
num::{NonZeroU16, NonZeroUsize},
sync::Arc,
};
use bytes::Bytes;
use chrono::Utc;
use icechunk::{
Repository, RepositoryConfig, Storage,
asset_manager::AssetManager,
config::DEFAULT_MAX_CONCURRENT_REQUESTS,
format::{
ChunkIndices, Path,
format_constants::SpecVersionBin,
manifest::{ChunkPayload, VirtualChunkLocation, VirtualChunkRef},
snapshot::ArrayShape,
},
new_in_memory_storage,
ops::stats::repo_chunks_storage,
};
use icechunk_macros::tokio_test;
use rstest::rstest;
use rstest_reuse::{self, *};
use crate::common;
use crate::common::Permission;
#[template]
#[rstest]
#[case::v1(SpecVersionBin::V1)]
#[case::v2(SpecVersionBin::V2)]
fn spec_version_cases(#[case] spec_version: SpecVersionBin) {}
#[tokio_test]
#[apply(spec_version_cases)]
pub async fn test_repo_chunks_storage_in_memory(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let storage = new_in_memory_storage().await?;
do_test_repo_chunks_storage(storage, spec_version).await
}
#[tokio_test]
#[apply(spec_version_cases)]
pub async fn test_repo_chunks_storage_in_minio(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_stats_{:?}_{}", spec_version, Utc::now().timestamp_millis());
let storage = common::make_minio_integration_storage(prefix, &Permission::Modify)?;
do_test_repo_chunks_storage(storage, spec_version).await
}
#[tokio_test]
#[apply(spec_version_cases)]
#[ignore = "needs credentials from env"]
pub async fn test_repo_chunks_storage_in_aws(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_stats_{:?}_{}", spec_version, Utc::now().timestamp_millis());
let storage = common::make_aws_integration_storage(prefix)?;
do_test_repo_chunks_storage(storage, spec_version).await
}
#[tokio_test]
#[apply(spec_version_cases)]
#[ignore = "needs credentials from env"]
pub async fn test_repo_chunks_storage_in_tigris(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_stats_{:?}_{}", spec_version, Utc::now().timestamp_millis());
let storage = common::make_tigris_integration_storage(prefix)?;
do_test_repo_chunks_storage(storage, spec_version).await
}
#[tokio_test]
#[apply(spec_version_cases)]
#[ignore = "needs credentials from env"]
pub async fn test_repo_chunks_storage_in_r2(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let prefix =
format!("test_stats_{:?}_{}", spec_version, Utc::now().timestamp_millis());
let storage = common::make_r2_integration_storage(prefix)?;
do_test_repo_chunks_storage(storage, spec_version).await
}
async fn do_test_repo_chunks_storage(
storage: Arc<dyn Storage + Send + Sync>,
spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let storage_settings = storage.default_settings().await?;
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings.clone(),
spec_version,
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let repo = Repository::create(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(5),
..Default::default()
}),
Arc::clone(&storage),
Default::default(),
Some(spec_version),
true,
)
.await?;
let mut session = repo.writable_session("main").await?;
let user_data = Bytes::new();
session.add_group(Path::root(), user_data.clone()).await?;
let shape = ArrayShape::new(vec![(100, 100)]).unwrap();
let array_path: Path = "/array".try_into().unwrap();
session.add_array(array_path.clone(), shape, None, user_data.clone()).await?;
for idx in 0..50 {
let bytes = Bytes::copy_from_slice(&[0, 1, 2, 3, 4, 5]);
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
for idx in 50..60 {
let bytes = Bytes::copy_from_slice(&[0]);
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
for idx in 60..70 {
let payload = ChunkPayload::Virtual(VirtualChunkRef {
location: VirtualChunkLocation::from_url("s3://foo/bar").unwrap(),
offset: idx as u64,
length: 100,
checksum: None,
});
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
let stats = repo_chunks_storage(
Arc::clone(&asset_manager),
NonZeroU16::new(5).unwrap(),
NonZeroUsize::MIN,
NonZeroU16::try_from(10).unwrap(),
)
.await
.unwrap();
assert_eq!(stats.native_bytes, 0);
assert_eq!(stats.virtual_bytes, 0);
assert_eq!(stats.inlined_bytes, 0);
let _ = session.commit("first").max_concurrent_nodes(8).execute().await?;
let stats = repo_chunks_storage(
Arc::clone(&asset_manager),
NonZeroU16::new(5).unwrap(),
NonZeroUsize::MAX,
NonZeroU16::try_from(10).unwrap(),
)
.await
.unwrap();
assert_eq!(stats.native_bytes, 50 * 6);
assert_eq!(stats.inlined_bytes, 10);
assert_eq!(stats.virtual_bytes, 10 * 100);
let mut session = repo.writable_session("main").await?;
for idx in 0..10 {
let bytes = Bytes::copy_from_slice(&[0, 1, 2, 3, 4, 5]);
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
let second_commit =
session.commit("second").max_concurrent_nodes(8).execute().await?;
let stats = repo_chunks_storage(
Arc::clone(&asset_manager),
NonZeroU16::new(5).unwrap(),
NonZeroUsize::MIN,
NonZeroU16::try_from(10).unwrap(),
)
.await
.unwrap();
assert_eq!(stats.native_bytes, (50 + 10) * 6);
assert_eq!(stats.inlined_bytes, 10 + 10);
assert_eq!(stats.virtual_bytes, 10 * 100);
repo.create_branch("dev", &second_commit).await.unwrap();
let mut session = repo.writable_session("dev").await?;
for idx in 0..5 {
let bytes = Bytes::copy_from_slice(&[0, 1, 2, 3, 4, 5]);
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
for idx in 50..60 {
let bytes = Bytes::copy_from_slice(&[0]);
let payload = session.get_chunk_writer()?(bytes.clone()).await?;
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
let _ = session.commit("third").max_concurrent_nodes(8).execute().await?;
let stats = repo_chunks_storage(
Arc::clone(&asset_manager),
NonZeroU16::new(5).unwrap(),
NonZeroUsize::MAX,
NonZeroU16::try_from(10).unwrap(),
)
.await
.unwrap();
assert_eq!(stats.native_bytes, 50 * 6 + 10 * 6 + 5 * 6);
assert_eq!(stats.inlined_bytes, 10 + 10 + 10);
assert_eq!(stats.virtual_bytes, 10 * 100);
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
pub async fn test_virtual_chunk_deduplication(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
let storage = new_in_memory_storage().await?;
let storage_settings = storage.default_settings().await?;
let asset_manager = Arc::new(AssetManager::new_no_cache(
Arc::clone(&storage),
storage_settings.clone(),
spec_version,
1,
DEFAULT_MAX_CONCURRENT_REQUESTS,
));
let repo = Repository::create(
Some(RepositoryConfig {
inline_chunk_threshold_bytes: Some(5),
..Default::default()
}),
storage,
Default::default(),
Some(spec_version),
true,
)
.await?;
let mut session = repo.writable_session("main").await?;
let user_data = Bytes::new();
session.add_group(Path::root(), user_data.clone()).await?;
let shape = ArrayShape::new(vec![(100, 100)]).unwrap();
let array_path: Path = "/array".try_into().unwrap();
session.add_array(array_path.clone(), shape, None, user_data.clone()).await?;
for idx in 0u32..10 {
let payload = ChunkPayload::Virtual(VirtualChunkRef {
location: VirtualChunkLocation::from_url("s3://bucket/file.dat").unwrap(),
offset: (idx * 100) as u64,
length: 100,
checksum: None,
});
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
for idx in 10u32..15 {
let payload = ChunkPayload::Virtual(VirtualChunkRef {
location: VirtualChunkLocation::from_url("s3://bucket/file.dat").unwrap(),
offset: 0, length: 100,
checksum: None,
});
session
.set_chunk_ref(array_path.clone(), ChunkIndices(vec![idx]), Some(payload))
.await?;
}
session.commit("first").max_concurrent_nodes(8).execute().await?;
let stats = repo_chunks_storage(
Arc::clone(&asset_manager),
NonZeroU16::new(5).unwrap(),
NonZeroUsize::MAX,
NonZeroU16::try_from(10).unwrap(),
)
.await
.unwrap();
assert_eq!(stats.virtual_bytes, 10 * 100);
assert_eq!(stats.native_bytes, 0);
assert_eq!(stats.inlined_bytes, 0);
Ok(())
}