#![cfg(not(feature = "shuttle"))]
#![allow(clippy::unwrap_used)]
use chrono::Utc;
use icechunk_macros::tokio_test;
use pretty_assertions::assert_eq;
use rstest::rstest;
use rstest_reuse::{self, *};
use std::{collections::HashMap, ops::Range, sync::Arc};
use bytes::Bytes;
use icechunk::{
Repository, RepositoryConfig, Storage,
format::{
ByteRange, ChunkIndices, Path, format_constants::SpecVersionBin,
snapshot::ArrayShape,
},
repository::VersionInfo,
session::{Session, get_chunk},
};
use tokio::task::JoinSet;
use crate::common;
use crate::common::Permission;
#[template]
#[rstest]
#[case::v1(SpecVersionBin::V1)]
#[case::v2(SpecVersionBin::V2)]
fn spec_version_cases(#[case] spec_version: SpecVersionBin) {}
const SIZE: usize = 10;
async fn mk_repo(
storage: Arc<dyn Storage + Send + Sync>,
init: bool,
spec_version: SpecVersionBin,
) -> Result<Repository, Box<dyn std::error::Error>> {
if init {
let config = RepositoryConfig {
inline_chunk_threshold_bytes: Some(0),
..RepositoryConfig::default()
};
Ok(Repository::create(
Some(config),
storage,
HashMap::new(),
Some(spec_version),
true,
)
.await?)
} else {
Ok(Repository::open(None, storage, HashMap::new()).await?)
}
}
async fn write_chunks(
repo: Repository,
xs: Range<u32>,
ys: Range<u32>,
) -> Result<Session, Box<dyn std::error::Error + Send + Sync>> {
let mut ds = repo.writable_session("main").await?;
for x in xs {
for y in ys.clone() {
let fx = x as f64;
let fy = y as f64;
let bytes: Vec<u8> = fx
.to_le_bytes()
.into_iter()
.chain(fy.to_le_bytes().into_iter())
.collect();
let payload =
ds.get_chunk_writer()?(Bytes::copy_from_slice(bytes.as_slice())).await?;
ds.set_chunk_ref(
"/array".try_into().unwrap(),
ChunkIndices(vec![x, y]),
Some(payload),
)
.await?;
}
}
Ok(ds)
}
async fn verify(ds: Session) -> Result<(), Box<dyn std::error::Error>> {
for x in 0..(SIZE / 2) as u32 {
for y in 0..(SIZE / 2) as u32 {
let bytes = get_chunk(
ds.get_chunk_reader(
&"/array".try_into().unwrap(),
&ChunkIndices(vec![x, y]),
&ByteRange::ALL,
)
.await?,
)
.await?;
assert!(bytes.is_some());
let bytes = bytes.unwrap();
let written_x = f64::from_le_bytes(bytes[0..8].try_into().unwrap());
let written_y = f64::from_le_bytes(bytes[8..16].try_into().unwrap());
assert_eq!(x as f64, written_x);
assert_eq!(y as f64, written_y);
}
}
Ok(())
}
#[tokio_test]
#[apply(spec_version_cases)]
async fn test_distributed_writes_in_minio(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
do_test_distributed_writes(spec_version, |prefix| async {
common::make_minio_integration_storage(prefix, &Permission::Modify)
})
.await
}
#[tokio_test]
#[apply(spec_version_cases)]
#[ignore = "needs credentials from env"]
async fn test_distributed_writes_in_aws(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
do_test_distributed_writes(spec_version, |prefix| async {
common::make_aws_integration_storage(prefix)
})
.await
}
#[tokio_test]
#[apply(spec_version_cases)]
#[ignore = "needs credentials from env"]
async fn test_distributed_writes_in_r2(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
do_test_distributed_writes(spec_version, |prefix| async {
common::make_r2_integration_storage(prefix)
})
.await
}
#[tokio_test]
#[apply(spec_version_cases)]
#[ignore = "needs credentials from env"]
async fn test_distributed_writes_in_tigris(
#[case] spec_version: SpecVersionBin,
) -> Result<(), Box<dyn std::error::Error>> {
do_test_distributed_writes(spec_version, |prefix| async {
common::make_tigris_integration_storage(prefix)
})
.await
}
async fn do_test_distributed_writes<F, Fut>(
spec_version: SpecVersionBin,
mk_storage: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn(String) -> Fut,
Fut: Future<
Output = Result<Arc<dyn Storage + Send + Sync>, Box<dyn std::error::Error>>,
>,
{
let prefix = format!(
"test_distributed_writes_{:?}_{}",
spec_version,
Utc::now().timestamp_millis()
);
let storage1 = mk_storage(prefix.clone()).await?;
let storage2 = mk_storage(prefix.clone()).await?;
let storage3 = mk_storage(prefix.clone()).await?;
let storage4 = mk_storage(prefix.clone()).await?;
let repo1 = mk_repo(storage1, true, spec_version).await?;
let mut ds1 = repo1.writable_session("main").await?;
let shape =
ArrayShape::new(vec![(SIZE as u64, SIZE as u32), (SIZE as u64, SIZE as u32)])
.unwrap();
let user_data = Bytes::new();
let new_array_path: Path = "/array".try_into().unwrap();
ds1.add_array(new_array_path.clone(), shape, None, user_data).await?;
ds1.commit("create array").max_concurrent_nodes(8).execute().await?;
let repo2 = mk_repo(storage2, false, spec_version).await?;
let repo3 = mk_repo(storage3, false, spec_version).await?;
let repo4 = mk_repo(storage4, false, spec_version).await?;
let mut set = JoinSet::new();
#[expect(clippy::erasing_op, clippy::identity_op)]
{
let size2 = SIZE as u32;
let size24 = size2 / 4;
let xrange1 = size24 * 0..size24 * 1;
let xrange2 = size24 * 1..size24 * 2;
let xrange3 = size24 * 2..size24 * 3;
let xrange4 = size24 * 3..size24 * 4;
set.spawn(async move { write_chunks(repo1, xrange1, 0..size2).await });
set.spawn(async move { write_chunks(repo2, xrange2, 0..size2).await });
set.spawn(async move { write_chunks(repo3, xrange3, 0..size2).await });
set.spawn(async move { write_chunks(repo4, xrange4, 0..size2).await });
}
let mut write_results = set.join_all().await;
assert!(write_results.len() == 4);
assert!(write_results.iter().all(|r| {
r.as_ref()
.inspect_err(
#[expect(clippy::dbg_macro)]
|e| {
dbg!(e);
},
)
.is_ok()
}));
let mut ds1 = write_results.pop().unwrap().unwrap();
let ds2 = write_results.pop().unwrap().unwrap();
let ds3 = write_results.pop().unwrap().unwrap();
let ds4 = write_results.pop().unwrap().unwrap();
let raw_sessions: Vec<Vec<u8>> =
vec![ds2.as_bytes().unwrap(), ds3.as_bytes().unwrap(), ds4.as_bytes().unwrap()];
let sessions =
raw_sessions.into_iter().map(|bytes| Session::from_bytes(&bytes).unwrap());
for session in sessions {
ds1.merge(session).await?;
}
let _new_snapshot =
ds1.commit("distributed commit").max_concurrent_nodes(8).execute().await?;
verify(ds1).await?;
let storage = mk_storage(prefix).await?;
let repo = mk_repo(storage, false, spec_version).await?;
let ds =
repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())).await?;
verify(ds).await?;
Ok(())
}