mod sharding_codec;
mod sharding_codec_builder;
mod sharding_options;
#[cfg(feature = "async")]
mod sharding_partial_decoder_async;
mod sharding_partial_decoder_sync;
mod sharding_partial_encoder;
use std::borrow::Cow;
use std::num::NonZeroU64;
use std::sync::Arc;
use crate::array::concurrency::calc_concurrency_outer_inner;
use crate::array::{
ArrayBytes, BytesRepresentation, ChunkShape, ChunkShapeTraits, CodecChain, DataType, FillValue,
RecommendedConcurrency, ravel_indices,
};
pub use sharding_codec::ShardingCodec;
pub use sharding_codec_builder::ShardingCodecBuilder;
pub use sharding_options::{ShardingCodecOptions, SubchunkWriteOrder};
#[cfg(feature = "async")]
pub(crate) use sharding_partial_decoder_async::AsyncShardingPartialDecoder;
pub(crate) use sharding_partial_decoder_sync::ShardingPartialDecoder;
use zarrs_codec::{
ArrayCodecTraits, ArrayToBytesCodecTraits, BytesPartialDecoderTraits, Codec, CodecError,
CodecOptions, CodecPluginV3, CodecTraitsV3,
};
use zarrs_metadata::v3::MetadataV3;
pub use zarrs_metadata_ext::codec::sharding::{
ShardingCodecConfiguration, ShardingCodecConfigurationV1, ShardingIndexLocation,
};
use zarrs_plugin::PluginCreateError;
use zarrs_storage::byte_range::ByteRange;
zarrs_plugin::impl_extension_aliases!(ShardingCodec, v3: "sharding_indexed");
inventory::submit! {
CodecPluginV3::new::<ShardingCodec>()
}
impl CodecTraitsV3 for ShardingCodec {
fn create(metadata: &MetadataV3) -> Result<Codec, PluginCreateError> {
let configuration: ShardingCodecConfiguration = metadata.to_typed_configuration()?;
let codec = Arc::new(ShardingCodec::new_with_configuration(&configuration)?);
Ok(Codec::ArrayToBytes(codec))
}
}
fn calculate_chunks_per_shard(
shard_shape: &[NonZeroU64],
subchunk_shape: &[NonZeroU64],
) -> Result<ChunkShape, CodecError> {
std::iter::zip(shard_shape, subchunk_shape)
.map(|(s, c)| {
let s = s.get();
let c = c.get();
if num::Integer::is_multiple_of(&s, &c) {
Ok(unsafe { NonZeroU64::new_unchecked(s / c) })
} else {
Err(CodecError::Other(
format!("invalid subchunk shape {subchunk_shape:?}, it must evenly divide shard shape {shard_shape:?}")
))
}
})
.collect()
}
fn sharding_index_shape(chunks_per_shard: &[NonZeroU64]) -> ChunkShape {
let mut index_shape = Vec::with_capacity(chunks_per_shard.len() + 1);
index_shape.extend(chunks_per_shard);
index_shape.push(unsafe { NonZeroU64::new_unchecked(2) });
ChunkShape::from(index_shape)
}
fn compute_index_encoded_size(
index_codecs: &dyn ArrayToBytesCodecTraits,
sharding_index_shape: &[NonZeroU64],
) -> Result<u64, CodecError> {
let bytes_representation = index_codecs.encoded_representation(
sharding_index_shape,
&crate::array::data_type::uint64(),
&FillValue::from(u64::MAX),
)?;
match bytes_representation {
BytesRepresentation::FixedSize(size) => Ok(size),
BytesRepresentation::BoundedSize(_) | BytesRepresentation::UnboundedSize => {
Err(CodecError::Other(
"the array index cannot include a variable size output codec".to_string(),
))
}
}
}
fn decode_shard_index(
encoded_shard_index: &[u8],
index_shape: &[NonZeroU64],
index_codecs: &dyn ArrayToBytesCodecTraits,
options: &CodecOptions,
) -> Result<Vec<u64>, CodecError> {
let decoded_shard_index = index_codecs.decode(
Cow::Borrowed(encoded_shard_index),
index_shape,
&crate::array::data_type::uint64(),
&FillValue::from(u64::MAX),
options,
)?;
let decoded_shard_index = decoded_shard_index.into_fixed()?;
Ok(decoded_shard_index
.as_chunks::<8>()
.0
.iter()
.map(|v| u64::from_ne_bytes(*v))
.collect())
}
fn get_index_byte_range(
index_shape: &[NonZeroU64],
index_codecs: &CodecChain,
index_location: ShardingIndexLocation,
) -> Result<ByteRange, CodecError> {
let index_encoded_size = compute_index_encoded_size(index_codecs, index_shape)
.map_err(|e| CodecError::Other(e.to_string()))?;
Ok(match index_location {
ShardingIndexLocation::Start => ByteRange::FromStart(0, Some(index_encoded_size)),
ShardingIndexLocation::End => ByteRange::Suffix(index_encoded_size),
})
}
fn subchunk_byte_range(
shard_index: Option<&[u64]>,
shard_shape: &[NonZeroU64],
chunk_shape: &[NonZeroU64],
chunk_indices: &[u64],
) -> Result<Option<ByteRange>, CodecError> {
if let Some(shard_index) = shard_index {
let chunks_per_shard = calculate_chunks_per_shard(shard_shape, chunk_shape)?;
let chunks_per_shard = chunks_per_shard.to_array_shape();
let shard_index_idx =
ravel_indices(chunk_indices, &chunks_per_shard).expect("inbounds indices");
let shard_index_idx = usize::try_from(shard_index_idx).unwrap();
let offset = shard_index[shard_index_idx * 2];
let size = shard_index[shard_index_idx * 2 + 1];
Ok(Some(ByteRange::new(offset..offset + size)))
} else {
Ok(None)
}
}
fn partial_decode_empty_shard<'a>(
data_type: &DataType,
fill_value: &FillValue,
indexer: &dyn crate::array::Indexer,
) -> Result<ArrayBytes<'a>, CodecError> {
ArrayBytes::new_fill_value(data_type, indexer.len(), fill_value).map_err(CodecError::from)
}
fn get_concurrent_target_and_codec_options(
inner_codecs: &CodecChain,
data_type: &DataType,
subchunk_shape: &[NonZeroU64],
chunks_per_shard: &[u64],
options: &CodecOptions,
) -> Result<(usize, CodecOptions), CodecError> {
let num_chunks = usize::try_from(chunks_per_shard.iter().product::<u64>()).unwrap();
let (subchunk_concurrent_limit, concurrency_limit_codec) = calc_concurrency_outer_inner(
options.concurrent_target(),
&RecommendedConcurrency::new_maximum(std::cmp::min(
options.concurrent_target(),
num_chunks,
)),
&inner_codecs.recommended_concurrency(subchunk_shape, data_type)?,
);
let options = options.with_concurrent_target(concurrency_limit_codec);
Ok((subchunk_concurrent_limit, options))
}
fn decode_shard_index_partial_decoder(
input_handle: &dyn BytesPartialDecoderTraits,
index_codecs: &CodecChain,
index_location: ShardingIndexLocation,
shard_shape: &[NonZeroU64],
subchunk_shape: &[NonZeroU64],
options: &CodecOptions,
) -> Result<Option<Vec<u64>>, CodecError> {
let chunks_per_shard = calculate_chunks_per_shard(shard_shape, subchunk_shape)?;
let index_shape = sharding_index_shape(&chunks_per_shard);
let index_byte_range = get_index_byte_range(&index_shape, index_codecs, index_location)?;
let encoded_shard_index = input_handle.partial_decode(index_byte_range, options)?;
Ok(match encoded_shard_index {
Some(encoded_shard_index) => Some(decode_shard_index(
&encoded_shard_index,
&index_shape,
index_codecs,
options,
)?),
None => None,
})
}
#[cfg(feature = "async")]
async fn decode_shard_index_async_partial_decoder(
input_handle: &dyn zarrs_codec::AsyncBytesPartialDecoderTraits,
index_codecs: &CodecChain,
index_location: ShardingIndexLocation,
shard_shape: &[NonZeroU64],
subchunk_shape: &[NonZeroU64],
options: &CodecOptions,
) -> Result<Option<Vec<u64>>, CodecError> {
let chunks_per_shard = calculate_chunks_per_shard(shard_shape, subchunk_shape)?;
let index_shape = sharding_index_shape(&chunks_per_shard);
let index_byte_range = get_index_byte_range(&index_shape, index_codecs, index_location)?;
let encoded_shard_index = input_handle
.partial_decode(index_byte_range, options)
.await?;
Ok(match encoded_shard_index {
Some(encoded_shard_index) => Some(decode_shard_index(
&encoded_shard_index,
&index_shape,
index_codecs,
options,
)?),
None => None,
})
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use super::*;
use crate::array::codec::bytes_to_bytes::test_unbounded::TestUnboundedCodec;
use crate::array::{ArrayBytes, ArraySubset, data_type};
use zarrs_chunk_grid::Indexer;
use zarrs_codec::{ArrayToBytesCodecTraits, BytesToBytesCodecTraits, CodecSpecificOptions};
fn get_concurrent_target(parallel: bool) -> usize {
if parallel {
rayon::current_num_threads()
} else {
1
}
}
const JSON_VALID2: &str = r#"{
"chunk_shape": [1, 2, 2],
"codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little"
}
},
{
"name": "gzip",
"configuration": {
"level": 1
}
}
],
"index_codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little"
}
},
{ "name": "crc32c" }
]
}"#;
const JSON_VALID3: &str = r#"{
"chunk_shape": [2, 2],
"codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little"
}
}
],
"index_codecs": [
{
"name": "bytes",
"configuration": {
"endian": "little"
}
}
],
"index_location": "start"
}"#;
enum FillValueAmount {
Partial,
All,
None,
}
fn codec_sharding_round_trip_impl(
options: &CodecOptions,
unbounded: bool,
index_at_end: bool,
fill_value_amount: &FillValueAmount,
mut bytes_to_bytes_codecs: Vec<Arc<dyn BytesToBytesCodecTraits>>,
subchunk_write_order: SubchunkWriteOrder,
) {
const NUM_AXES: usize = 3;
let chunk_size = 16;
let subchunk_size = 2;
let chunk_shape = vec![NonZeroU64::new(chunk_size).unwrap(); NUM_AXES];
let subchunk_shape = vec![NonZeroU64::new(subchunk_size).unwrap(); NUM_AXES];
let data_type = data_type::uint16();
let fill_value = FillValue::from(0u16);
let elem_size = std::mem::size_of::<u16>();
let elements: Vec<u16> = match fill_value_amount {
FillValueAmount::All => vec![0u16; chunk_shape.num_elements_usize()],
FillValueAmount::Partial => {
let subset1 = ArraySubset::new_with_ranges(&[(0..2), (0..2), (0..2)]);
let subset2 = ArraySubset::new_with_ranges(&[(5..7), (0..2), (0..2)]);
let mut data = vec![0u16; chunk_shape.num_elements_usize()];
subset1
.iter_contiguous_byte_ranges(&[chunk_size; NUM_AXES], 2)
.unwrap()
.for_each(|r| {
let start = r.start as usize / elem_size;
let end = r.end as usize / elem_size;
data[start..end].fill(1);
});
subset2
.iter_contiguous_byte_ranges(&[chunk_size; NUM_AXES], 2)
.unwrap()
.for_each(|r| {
let start = r.start as usize / elem_size;
let end = r.end as usize / elem_size;
data[start..end].fill(1);
});
data
}
FillValueAmount::None => (1..(1 + chunk_shape.num_elements_usize() as u16)).collect(),
};
let bytes = crate::array::transmute_to_bytes_vec(elements);
let bytes: ArrayBytes = bytes.into();
if unbounded {
bytes_to_bytes_codecs.push(Arc::new(TestUnboundedCodec::new()));
}
let codec: ShardingCodec = ShardingCodecBuilder::new(subchunk_shape, &data_type::uint16())
.index_location(if index_at_end {
ShardingIndexLocation::End
} else {
ShardingIndexLocation::Start
})
.bytes_to_bytes_codecs(bytes_to_bytes_codecs)
.build();
let encoded = Arc::new(codec.clone())
.with_codec_specific_options(&CodecSpecificOptions::default().with_option(
ShardingCodecOptions::default().with_subchunk_write_order(subchunk_write_order),
))
.encode(
bytes.clone(),
&chunk_shape,
&data_type,
&fill_value,
options,
)
.unwrap();
let decoded = codec
.decode(
encoded.clone(),
&chunk_shape,
&data_type,
&fill_value,
options,
)
.unwrap();
assert_eq!(bytes, decoded);
assert_ne!(encoded, decoded.into_fixed().unwrap());
let index = codec
.decode_index(
&encoded,
&[NonZeroU64::new(chunk_size / subchunk_size).unwrap(); NUM_AXES],
options,
)
.unwrap();
match fill_value_amount {
FillValueAmount::None => match codec.options.subchunk_write_order() {
SubchunkWriteOrder::Random => (),
SubchunkWriteOrder::C => {
let mut offset_with_len = index.chunks(2).collect::<Vec<&[u64]>>();
offset_with_len.sort_by_key(|x| x[0]);
assert_eq!(
offset_with_len
.into_iter()
.flat_map(|e| e.iter().copied())
.collect::<Vec<u64>>(),
index
);
}
},
FillValueAmount::Partial => match codec.options.subchunk_write_order() {
SubchunkWriteOrder::Random => (),
SubchunkWriteOrder::C => {
let filtered_index = index
.into_iter()
.filter(|e| *e != u64::MAX)
.collect::<Vec<u64>>();
let mut offset_with_len = filtered_index.chunks(2).collect::<Vec<&[u64]>>();
offset_with_len.sort_by_key(|x| x[0]);
assert_eq!(
offset_with_len
.into_iter()
.flat_map(|e| e.iter().copied())
.collect::<Vec<u64>>(),
filtered_index
);
assert_eq!(filtered_index.len(), 6);
}
},
FillValueAmount::All => assert_eq!(index, vec![u64::MAX; 8 * 8 * 8 * 2]),
}
}
#[test]
fn codec_sharding_round_trip1() {
for subchunk_write_order in [SubchunkWriteOrder::C, SubchunkWriteOrder::Random] {
for index_at_end in [true, false] {
for fill_value_amount in [
FillValueAmount::All,
FillValueAmount::None,
FillValueAmount::Partial,
] {
for unbounded in [true, false] {
for parallel in [true, false] {
let concurrent_target = get_concurrent_target(parallel);
let options =
CodecOptions::default().with_concurrent_target(concurrent_target);
codec_sharding_round_trip_impl(
&options,
unbounded,
index_at_end,
&fill_value_amount,
vec![],
subchunk_write_order,
);
}
}
}
}
}
}
#[cfg(feature = "gzip")]
#[cfg(feature = "crc32c")]
#[test]
fn codec_sharding_round_trip2() {
use crate::array::codec::{Crc32cCodec, GzipCodec};
for subchunk_write_order in [SubchunkWriteOrder::C, SubchunkWriteOrder::Random] {
for index_at_end in [true, false] {
for fill_value_amount in [
FillValueAmount::All,
FillValueAmount::None,
FillValueAmount::Partial,
] {
for unbounded in [true, false] {
for parallel in [true, false] {
let concurrent_target = get_concurrent_target(parallel);
let options =
CodecOptions::default().with_concurrent_target(concurrent_target);
codec_sharding_round_trip_impl(
&options,
unbounded,
index_at_end,
&fill_value_amount,
vec![
Arc::new(GzipCodec::new(5).unwrap()),
Arc::new(Crc32cCodec::new()),
],
subchunk_write_order,
);
}
}
}
}
}
}
#[cfg(feature = "async")]
async fn codec_sharding_async_round_trip_impl(
options: &CodecOptions,
unbounded: bool,
index_at_end: bool,
all_fill_value: bool,
mut bytes_to_bytes_codecs: Vec<Arc<dyn BytesToBytesCodecTraits>>,
) {
let shape = vec![NonZeroU64::new(4).unwrap(); 2];
let data_type = data_type::uint16();
let fill_value = FillValue::from(0u16);
let elements: Vec<u16> = if all_fill_value {
vec![0; shape.num_elements_usize()]
} else {
(0..shape.num_elements_usize() as u16).collect()
};
let bytes = crate::array::transmute_to_bytes_vec(elements);
let bytes: ArrayBytes = bytes.into();
if unbounded {
bytes_to_bytes_codecs.push(Arc::new(TestUnboundedCodec::new()));
}
let codec =
ShardingCodecBuilder::new(vec![NonZeroU64::new(2).unwrap(); 2], &data_type::uint16())
.index_location(if index_at_end {
ShardingIndexLocation::End
} else {
ShardingIndexLocation::Start
})
.bytes_to_bytes_codecs(bytes_to_bytes_codecs)
.build();
let encoded = codec
.encode(bytes.clone(), &shape, &data_type, &fill_value, options)
.unwrap();
let decoded = codec
.decode(encoded.clone(), &shape, &data_type, &fill_value, options)
.unwrap();
assert_eq!(bytes, decoded);
assert_ne!(encoded, decoded.into_fixed().unwrap());
}
#[cfg(feature = "async")]
#[tokio::test]
async fn codec_sharding_async_round_trip() {
for index_at_end in [true, false] {
for all_fill_value in [true, false] {
for unbounded in [true, false] {
for parallel in [true, false] {
let concurrent_target = get_concurrent_target(parallel);
let options =
CodecOptions::default().with_concurrent_target(concurrent_target);
codec_sharding_async_round_trip_impl(
&options,
unbounded,
all_fill_value,
index_at_end,
vec![],
)
.await;
}
}
}
}
}
fn codec_sharding_partial_decode(
options: &CodecOptions,
unbounded: bool,
index_at_end: bool,
all_fill_value: bool,
) {
let chunk_shape: ChunkShape = vec![NonZeroU64::new(4).unwrap(); 2];
let data_type = data_type::uint8();
let fill_value = FillValue::from(0u8);
let elements: Vec<u8> = if all_fill_value {
vec![0; chunk_shape.num_elements_usize()]
} else {
(0..chunk_shape.num_elements_usize() as u8).collect()
};
let answer: Vec<u8> = if all_fill_value {
vec![0, 0]
} else {
vec![4, 8]
};
let bytes: ArrayBytes = elements.into();
let bytes_to_bytes_codecs: Vec<Arc<dyn BytesToBytesCodecTraits>> = if unbounded {
vec![Arc::new(TestUnboundedCodec::new())]
} else {
vec![]
};
let codec = Arc::new(
ShardingCodecBuilder::new(chunk_shape.clone(), &data_type)
.index_location(if index_at_end {
ShardingIndexLocation::End
} else {
ShardingIndexLocation::Start
})
.bytes_to_bytes_codecs(bytes_to_bytes_codecs)
.build(),
);
let encoded = codec
.encode(
bytes.clone(),
&chunk_shape,
&data_type,
&fill_value,
options,
)
.unwrap();
let decoded_region = ArraySubset::new_with_ranges(&[1..3, 0..1]);
let input_handle = Arc::new(encoded);
let partial_decoder = codec
.partial_decoder(input_handle, &chunk_shape, &data_type, &fill_value, options)
.unwrap();
let decoded_partial_chunk = partial_decoder
.partial_decode(&decoded_region, options)
.unwrap();
let decoded_partial_chunk: Vec<u8> = decoded_partial_chunk
.into_fixed()
.unwrap()
.as_chunks::<1>()
.0
.iter()
.map(|b| u8::from_ne_bytes(*b))
.collect();
assert_eq!(answer, decoded_partial_chunk);
}
#[test]
fn codec_sharding_partial_decode_all() {
for index_at_end in [true, false] {
for all_fill_value in [true, false] {
for unbounded in [true, false] {
for parallel in [true, false] {
let concurrent_target = get_concurrent_target(parallel);
let options =
CodecOptions::default().with_concurrent_target(concurrent_target);
codec_sharding_partial_decode(
&options,
unbounded,
all_fill_value,
index_at_end,
);
}
}
}
}
}
#[cfg(feature = "async")]
async fn codec_sharding_async_partial_decode(
options: &CodecOptions,
unbounded: bool,
index_at_end: bool,
all_fill_value: bool,
) {
let chunk_shape: ChunkShape = vec![NonZeroU64::new(4).unwrap(); 2];
let data_type = data_type::uint8();
let fill_value = FillValue::from(0u8);
let elements: Vec<u8> = if all_fill_value {
vec![0; chunk_shape.num_elements_usize()]
} else {
(0..chunk_shape.num_elements_usize() as u8).collect()
};
let answer: Vec<u8> = if all_fill_value {
vec![0, 0]
} else {
vec![4, 8]
};
let bytes: ArrayBytes = elements.into();
let bytes_to_bytes_codecs: Vec<Arc<dyn BytesToBytesCodecTraits>> = if unbounded {
vec![Arc::new(TestUnboundedCodec::new())]
} else {
vec![]
};
let codec = Arc::new(
ShardingCodecBuilder::new(vec![NonZeroU64::new(2).unwrap(); 2], &data_type::uint8())
.index_location(if index_at_end {
ShardingIndexLocation::End
} else {
ShardingIndexLocation::Start
})
.bytes_to_bytes_codecs(bytes_to_bytes_codecs)
.build(),
);
let encoded = codec
.encode(
bytes.clone(),
&chunk_shape,
&data_type,
&fill_value,
options,
)
.unwrap();
let decoded_region = ArraySubset::new_with_ranges(&[1..3, 0..1]);
let input_handle = Arc::new(encoded);
let partial_decoder = codec
.async_partial_decoder(input_handle, &chunk_shape, &data_type, &fill_value, options)
.await
.unwrap();
let decoded_partial_chunk = partial_decoder
.partial_decode(&decoded_region, options)
.await
.unwrap()
.into_fixed()
.unwrap();
let decoded_partial_chunk: Vec<u8> = decoded_partial_chunk
.as_chunks::<1>()
.0
.iter()
.map(|b| u8::from_ne_bytes(*b))
.collect();
assert_eq!(answer, decoded_partial_chunk);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn codec_sharding_async_partial_decode_all() {
for index_at_end in [true, false] {
for all_fill_value in [true, false] {
for unbounded in [true, false] {
for parallel in [true, false] {
let concurrent_target = get_concurrent_target(parallel);
let options =
CodecOptions::default().with_concurrent_target(concurrent_target);
codec_sharding_async_partial_decode(
&options,
unbounded,
all_fill_value,
index_at_end,
)
.await;
}
}
}
}
}
#[cfg(feature = "gzip")]
#[cfg(feature = "crc32c")]
#[test]
fn codec_sharding_partial_decode2() {
let chunk_shape: ChunkShape = vec![
NonZeroU64::new(2).unwrap(),
NonZeroU64::new(4).unwrap(),
NonZeroU64::new(4).unwrap(),
];
let data_type = data_type::uint16();
let fill_value = FillValue::from(0u16);
let elements: Vec<u16> = (0..chunk_shape.num_elements_usize() as u16).collect();
let bytes = crate::array::transmute_to_bytes_vec(elements);
let bytes: ArrayBytes = bytes.into();
let codec_configuration: ShardingCodecConfiguration =
serde_json::from_str(JSON_VALID2).unwrap();
let codec = Arc::new(ShardingCodec::new_with_configuration(&codec_configuration).unwrap());
let encoded = codec
.encode(
bytes,
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
let decoded_region = ArraySubset::new_with_ranges(&[1..2, 0..2, 0..3]);
let input_handle = Arc::new(encoded);
let partial_decoder = codec
.partial_decoder(
input_handle.clone(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
assert_eq!(
partial_decoder.size_held(),
input_handle.size_held() + size_of::<u64>() * 2 * 2 * 2 * 2
); let decoded_partial_chunk = partial_decoder
.partial_decode(&decoded_region, &CodecOptions::default())
.unwrap();
println!("decoded_partial_chunk {decoded_partial_chunk:?}");
let decoded_partial_chunk: Vec<u16> = decoded_partial_chunk
.into_fixed()
.unwrap()
.as_chunks::<2>()
.0
.iter()
.map(|b| u16::from_ne_bytes(*b))
.collect();
let answer: Vec<u16> = vec![16, 17, 18, 20, 21, 22];
assert_eq!(answer, decoded_partial_chunk);
}
#[test]
fn codec_sharding_partial_decode3() {
let chunk_shape: ChunkShape = vec![NonZeroU64::new(4).unwrap(); 2];
let data_type = data_type::uint8();
let fill_value = FillValue::from(0u8);
let elements: Vec<u8> = (0..chunk_shape.num_elements_usize() as u8).collect();
let bytes: ArrayBytes = elements.into();
let codec_configuration: ShardingCodecConfiguration =
serde_json::from_str(JSON_VALID3).unwrap();
let codec = Arc::new(ShardingCodec::new_with_configuration(&codec_configuration).unwrap());
let encoded = codec
.encode(
bytes,
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
let decoded_region = ArraySubset::new_with_ranges(&[1..3, 0..1]);
let input_handle = Arc::new(encoded);
let partial_decoder = codec
.partial_decoder(
input_handle.clone(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
assert_eq!(
partial_decoder.size_held(),
input_handle.size_held() + size_of::<u64>() * 2 * 2 * 2
); let decoded_partial_chunk = partial_decoder
.partial_decode(&decoded_region, &CodecOptions::default())
.unwrap();
let decoded_partial_chunk: Vec<u8> = decoded_partial_chunk
.into_fixed()
.unwrap()
.as_chunks::<1>()
.0
.iter()
.map(|b| u8::from_ne_bytes(*b))
.collect();
let answer: Vec<u8> = vec![4, 8];
assert_eq!(answer, decoded_partial_chunk);
}
#[test]
fn codec_sharding_compact() {
let chunk_shape: ChunkShape = vec![NonZeroU64::new(4).unwrap(); 2];
let data_type = data_type::uint16();
let fill_value = FillValue::from(0u16);
let elements: Vec<u16> = (0..chunk_shape.num_elements_usize() as u16).collect();
let bytes = crate::array::transmute_to_bytes_vec(elements);
let original_bytes: ArrayBytes = bytes.into();
let codec_configuration: ShardingCodecConfiguration =
serde_json::from_str(JSON_VALID3).unwrap();
let codec = Arc::new(ShardingCodec::new_with_configuration(&codec_configuration).unwrap());
let original_encoded = codec
.encode(
original_bytes.clone(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
let original_size = original_encoded.len();
let decoded = codec
.decode(
original_encoded.clone(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
assert_eq!(original_bytes, decoded);
let input_output_handle = Arc::new(Mutex::new(Some(original_encoded.to_vec())));
{
let subchunk_subset = ArraySubset::new_with_ranges(&[0..2, 2..4]);
let updated_elements: Vec<u16> = vec![100, 101, 102, 103]; let updated_bytes = crate::array::transmute_to_bytes_vec(updated_elements);
let partial_encoder = codec
.clone()
.partial_encoder(
input_output_handle.clone(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
partial_encoder
.partial_encode(
&subchunk_subset,
&ArrayBytes::from(updated_bytes),
&CodecOptions::default(),
)
.unwrap();
}
let updated_encoded = Arc::try_unwrap(input_output_handle)
.unwrap()
.into_inner()
.expect("single ref")
.expect("non-empty shard");
let updated_size = updated_encoded.len();
assert!(
updated_size > original_size,
"Updated shard size ({updated_size}) should be > original size ({original_size})"
);
let compacted = codec
.compact(
updated_encoded.into(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
let compacted = compacted.expect("compaction should have occurred");
let compacted_size = compacted.len();
assert_eq!(
compacted_size, original_size,
"Compacted size ({compacted_size}) should be equal to original size ({original_size})"
);
let decoded_after_compact = codec
.decode(
compacted.clone(),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
let mut expected_elements: Vec<u16> =
(0..chunk_shape.num_elements_usize() as u16).collect();
expected_elements[2] = 100; expected_elements[3] = 101; expected_elements[6] = 102; expected_elements[7] = 103; let expected_bytes = crate::array::transmute_to_bytes_vec(expected_elements);
assert_eq!(ArrayBytes::from(expected_bytes), decoded_after_compact);
let compacted_again = codec
.compact(
Cow::Borrowed(&compacted),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
assert!(compacted_again.is_none());
let original_compacted = codec
.compact(
Cow::Borrowed(&original_encoded),
&chunk_shape,
&data_type,
&fill_value,
&CodecOptions::default(),
)
.unwrap();
assert!(original_compacted.is_none());
}
}