use std::borrow::Cow;
use std::num::NonZeroU64;
use std::ops::IndexMut;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
#[cfg(not(target_arch = "wasm32"))]
use rayon::prelude::*;
use unsafe_cell_slice::UnsafeCellSlice;
#[cfg(feature = "async")]
use super::sharding_partial_decoder_async::AsyncShardingPartialDecoder;
use super::sharding_partial_decoder_sync::ShardingPartialDecoder;
use super::{
CodecChain, ShardingCodecConfiguration, ShardingCodecConfigurationV1, ShardingCodecOptions,
ShardingIndexLocation, SubchunkWriteOrder, calculate_chunks_per_shard,
compute_index_encoded_size, decode_shard_index, sharding_index_shape, sharding_partial_encoder,
};
use crate::array::array_bytes_internal::merge_chunks_vlen;
use crate::array::concurrency::calc_concurrency_outer_inner;
use crate::array::{
ArrayBytes, ArrayBytesFixedDisjointView, ArrayBytesRaw, ArraySubset, BytesRepresentation,
ChunkShape, ChunkShapeTraits, DataType, DataTypeSize, FillValue, chunk_shape_to_array_shape,
transmute_to_bytes_vec, unravel_index,
};
use zarrs_codec::{
ArrayBytesDecodeIntoTarget, ArrayCodecTraits, ArrayPartialDecoderTraits,
ArrayPartialEncoderTraits, ArrayToBytesCodecTraits, BytesPartialDecoderTraits,
BytesPartialEncoderTraits, CodecError, CodecMetadataOptions, CodecOptions,
CodecSpecificOptions, CodecTraits, PartialDecoderCapability, PartialEncoderCapability,
RecommendedConcurrency,
};
#[cfg(feature = "async")]
use zarrs_codec::{AsyncArrayPartialDecoderTraits, AsyncBytesPartialDecoderTraits};
use zarrs_metadata::Configuration;
use zarrs_plugin::{ExtensionAliasesV3, PluginCreateError, ZarrVersion};
#[derive(Clone, Debug)]
pub struct ShardingCodec {
pub(crate) subchunk_shape: ChunkShape,
pub(crate) inner_codecs: Arc<CodecChain>,
pub(crate) index_codecs: Arc<CodecChain>,
pub(crate) index_location: ShardingIndexLocation,
pub(crate) options: ShardingCodecOptions,
}
impl ShardingCodec {
#[must_use]
pub fn new(
subchunk_shape: ChunkShape,
inner_codecs: Arc<CodecChain>,
index_codecs: Arc<CodecChain>,
index_location: ShardingIndexLocation,
) -> Self {
Self {
subchunk_shape,
inner_codecs,
index_codecs,
index_location,
options: ShardingCodecOptions::default(),
}
}
pub fn new_with_configuration(
configuration: &ShardingCodecConfiguration,
) -> Result<Self, PluginCreateError> {
match configuration {
ShardingCodecConfiguration::V1(configuration) => {
let inner_codecs = Arc::new(CodecChain::from_metadata(&configuration.codecs)?);
let index_codecs =
Arc::new(CodecChain::from_metadata(&configuration.index_codecs)?);
Ok(Self::new(
configuration.chunk_shape.clone(),
inner_codecs,
index_codecs,
configuration.index_location,
))
}
_ => Err(PluginCreateError::Other(
"this sharding_indexed codec configuration variant is unsupported".to_string(),
)),
}
}
}
impl CodecTraits for ShardingCodec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn configuration(
&self,
_version: ZarrVersion,
options: &CodecMetadataOptions,
) -> Option<Configuration> {
let configuration = ShardingCodecConfiguration::V1(ShardingCodecConfigurationV1 {
chunk_shape: self.subchunk_shape.clone(),
codecs: self.inner_codecs.create_metadatas(options),
index_codecs: self.index_codecs.create_metadatas(options),
index_location: self.index_location,
});
Some(configuration.into())
}
fn partial_decoder_capability(&self) -> PartialDecoderCapability {
PartialDecoderCapability {
partial_read: true,
partial_decode: true,
}
}
fn partial_encoder_capability(&self) -> PartialEncoderCapability {
PartialEncoderCapability {
partial_encode: true,
}
}
}
impl ArrayCodecTraits for ShardingCodec {
fn recommended_concurrency(
&self,
shape: &[NonZeroU64],
_data_type: &DataType,
) -> Result<RecommendedConcurrency, CodecError> {
let chunks_per_shard = calculate_chunks_per_shard(shape, self.subchunk_shape.as_slice())?;
let num_elements = chunks_per_shard.num_elements_nonzero_usize();
Ok(RecommendedConcurrency::new_maximum(num_elements.into()))
}
fn partial_decode_granularity(&self, _shape: &[NonZeroU64]) -> ChunkShape {
self.subchunk_shape.clone()
}
}
#[cfg_attr(
all(feature = "async", not(target_arch = "wasm32")),
async_trait::async_trait
)]
#[cfg_attr(all(feature = "async", target_arch = "wasm32"), async_trait::async_trait(?Send))]
impl ArrayToBytesCodecTraits for ShardingCodec {
fn into_dyn(self: Arc<Self>) -> Arc<dyn ArrayToBytesCodecTraits> {
self as Arc<dyn ArrayToBytesCodecTraits>
}
fn with_codec_specific_options(
self: Arc<Self>,
opts: &CodecSpecificOptions,
) -> Arc<dyn ArrayToBytesCodecTraits> {
if let Some(sharding_opts) = opts.get_option::<ShardingCodecOptions>() {
let mut codec = self;
Arc::make_mut(&mut codec).options = sharding_opts.clone();
codec
} else {
self.into_dyn()
}
}
fn encode<'a>(
&self,
bytes: ArrayBytes<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<ArrayBytesRaw<'a>, CodecError> {
let num_elements = shape.iter().map(|d| d.get()).product::<u64>();
bytes.validate(num_elements, data_type)?;
let chunk_bytes_representation = self.inner_codecs.encoded_representation(
&self.subchunk_shape,
data_type,
fill_value,
)?;
let bytes = match chunk_bytes_representation {
BytesRepresentation::BoundedSize(size) | BytesRepresentation::FixedSize(size) => self
.encode_bounded(
&bytes,
data_type,
fill_value,
shape,
&self.subchunk_shape,
size,
options,
),
BytesRepresentation::UnboundedSize => self.encode_unbounded(
&bytes,
data_type,
fill_value,
shape,
&self.subchunk_shape,
options,
),
}?;
Ok(ArrayBytesRaw::from(bytes))
}
#[allow(clippy::too_many_lines)]
fn decode<'a>(
&self,
encoded_shard: ArrayBytesRaw<'a>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<ArrayBytes<'a>, CodecError> {
let chunks_per_shard = calculate_chunks_per_shard(shape, &self.subchunk_shape)?;
let num_chunks = chunks_per_shard
.as_slice()
.iter()
.map(|i| usize::try_from(i.get()).unwrap())
.product::<usize>();
let shard_index =
self.decode_index(&encoded_shard, chunks_per_shard.as_slice(), options)?;
let (shard_concurrent_limit, concurrency_limit_subchunks) = calc_concurrency_outer_inner(
options.concurrent_target(),
&self.recommended_concurrency(shape, data_type)?,
&self
.inner_codecs
.recommended_concurrency(&self.subchunk_shape, data_type)?,
);
let options = options.with_concurrent_target(concurrency_limit_subchunks);
if data_type.is_optional() {
return Err(CodecError::UnsupportedDataType(
data_type.clone(),
Self::aliases_v3().default_name.to_string(),
));
}
let shard_shape_u64 = bytemuck::must_cast_slice(shape);
match data_type.size() {
DataTypeSize::Variable => {
let decode_subchunk = |chunk_index: usize| {
let chunk_subset = self
.chunk_index_to_subset(chunk_index as u64, chunks_per_shard.as_slice())
.expect("inbounds chunk");
let offset = shard_index[chunk_index * 2];
let size = shard_index[chunk_index * 2 + 1];
let chunk_bytes = if offset == u64::MAX && size == u64::MAX {
ArrayBytes::new_fill_value(
data_type,
self.subchunk_shape.num_elements_u64(),
fill_value,
)?
.into_variable()?
} else if usize::try_from(offset + size).unwrap() > encoded_shard.len() {
return Err(CodecError::Other(
"The shard index references out-of-bounds bytes. The chunk may be corrupted."
.to_string(),
));
} else {
let offset: usize = offset.try_into().unwrap();
let size: usize = size.try_into().unwrap();
let encoded_chunk = &encoded_shard[offset..offset + size];
self.inner_codecs
.decode(
Cow::Borrowed(encoded_chunk),
&self.subchunk_shape,
data_type,
fill_value,
&options,
)?
.into_variable()?
};
Ok((chunk_bytes, chunk_subset))
};
let chunk_bytes_and_subsets = crate::iter_concurrent_limit!(
shard_concurrent_limit,
(0..num_chunks),
map,
decode_subchunk
)
.collect::<Result<Vec<_>, _>>()?;
Ok(ArrayBytes::Variable(merge_chunks_vlen(
chunk_bytes_and_subsets,
shard_shape_u64,
)))
}
DataTypeSize::Fixed(data_type_size) => {
let num_elements = shape.iter().map(|d| d.get()).product::<u64>();
let size_output = usize::try_from(num_elements).unwrap() * data_type_size;
if size_output == 0 {
return Ok(ArrayBytes::new_flen(vec![]));
}
let mut decoded_shard = Vec::<u8>::with_capacity(size_output);
{
let output =
UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut decoded_shard);
let decode_chunk = |chunk_index: usize| {
let chunk_subset = self
.chunk_index_to_subset(chunk_index as u64, chunks_per_shard.as_slice())
.expect("inbounds chunk");
let mut output_view_subchunk = unsafe {
ArrayBytesFixedDisjointView::new(
output,
data_type_size,
shard_shape_u64,
chunk_subset,
)?
};
let offset = shard_index[chunk_index * 2];
let size = shard_index[chunk_index * 2 + 1];
if offset == u64::MAX && size == u64::MAX {
output_view_subchunk.fill(fill_value.as_ne_bytes())?;
} else if usize::try_from(offset + size).unwrap() > encoded_shard.len() {
return Err(CodecError::Other(
"The shard index references out-of-bounds bytes. The chunk may be corrupted."
.to_string(),
));
} else {
let offset: usize = offset.try_into().unwrap();
let size: usize = size.try_into().unwrap();
let encoded_chunk = &encoded_shard[offset..offset + size];
self.inner_codecs.decode_into(
Cow::Borrowed(encoded_chunk),
&self.subchunk_shape,
data_type,
fill_value,
ArrayBytesDecodeIntoTarget::Fixed(&mut output_view_subchunk),
&options,
)?;
}
Ok::<_, CodecError>(())
};
crate::iter_concurrent_limit!(
shard_concurrent_limit,
(0..num_chunks),
try_for_each,
decode_chunk
)?;
}
unsafe { decoded_shard.set_len(decoded_shard.capacity()) };
Ok(ArrayBytes::from(decoded_shard))
}
}
}
fn compact<'a>(
&self,
bytes: ArrayBytesRaw<'a>,
shape: &[NonZeroU64],
_data_type: &DataType,
_fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Option<ArrayBytesRaw<'a>>, CodecError> {
let chunks_per_shard = calculate_chunks_per_shard(shape, self.subchunk_shape.as_slice())?;
let shard_index = self.decode_index(&bytes, chunks_per_shard.as_slice(), options)?;
let index_shape = sharding_index_shape(chunks_per_shard.as_slice());
let index_encoded_size =
compute_index_encoded_size(self.index_codecs.as_ref(), &index_shape)?;
let mut needs_compaction = false;
let mut chunks_size = 0;
for &[offset, size] in shard_index.as_chunks::<2>().0 {
if offset != u64::MAX && size != u64::MAX {
chunks_size += size;
}
}
if chunks_size != bytes.len() as u64 - index_encoded_size {
needs_compaction = true;
}
if !needs_compaction {
return Ok(None); }
let data_size: usize = shard_index
.as_chunks::<2>()
.0
.iter()
.filter(|chunk| chunk[0] != u64::MAX)
.map(|chunk| usize::try_from(chunk[1]).unwrap())
.sum();
let compact_size = data_size + usize::try_from(index_encoded_size).unwrap();
let mut compact_shard = vec![0u8; compact_size];
let mut new_index = vec![u64::MAX; shard_index.len()];
let mut write_offset = match self.index_location {
ShardingIndexLocation::Start => usize::try_from(index_encoded_size).unwrap(),
ShardingIndexLocation::End => 0,
};
for (i, &[old_offset, size]) in shard_index.as_chunks::<2>().0.iter().enumerate() {
if old_offset != u64::MAX && size != u64::MAX {
let old_offset_usize = usize::try_from(old_offset).unwrap();
let size_usize = usize::try_from(size).unwrap();
if old_offset_usize + size_usize > bytes.len() {
return Err(CodecError::Other(
"The shard index references out-of-bounds bytes. The chunk may be corrupted."
.to_string(),
));
}
compact_shard[write_offset..write_offset + size_usize]
.copy_from_slice(&bytes[old_offset_usize..old_offset_usize + size_usize]);
new_index[i * 2] = u64::try_from(write_offset).unwrap();
new_index[i * 2 + 1] = size;
write_offset += size_usize;
}
}
let index_bytes = transmute_to_bytes_vec(new_index);
let encoded_index = self.index_codecs.encode(
ArrayBytes::from(index_bytes),
&index_shape,
&crate::array::data_type::uint64(),
&FillValue::from(u64::MAX),
options,
)?;
match self.index_location {
ShardingIndexLocation::Start => {
compact_shard[..encoded_index.len()].copy_from_slice(&encoded_index);
}
ShardingIndexLocation::End => {
let index_start = compact_size - encoded_index.len();
compact_shard[index_start..].copy_from_slice(&encoded_index);
}
}
Ok(Some(Cow::Owned(compact_shard)))
}
#[allow(clippy::too_many_lines)]
fn decode_into(
&self,
encoded_shard: ArrayBytesRaw<'_>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
output_target: ArrayBytesDecodeIntoTarget<'_>,
options: &CodecOptions,
) -> Result<(), CodecError> {
let output_view = match output_target {
ArrayBytesDecodeIntoTarget::Fixed(data) => data,
ArrayBytesDecodeIntoTarget::Optional(..) => {
return Err(CodecError::UnsupportedDataType(
data_type.clone(),
Self::aliases_v3().default_name.to_string(),
));
}
};
let chunks_per_shard = calculate_chunks_per_shard(shape, &self.subchunk_shape)?;
let num_chunks = chunks_per_shard
.as_slice()
.iter()
.map(|i| usize::try_from(i.get()).unwrap())
.product::<usize>();
let shard_index =
self.decode_index(&encoded_shard, chunks_per_shard.as_slice(), options)?;
let (shard_concurrent_limit, concurrency_limit_subchunks) = calc_concurrency_outer_inner(
options.concurrent_target(),
&self.recommended_concurrency(shape, data_type)?,
&self
.inner_codecs
.recommended_concurrency(&self.subchunk_shape, data_type)?,
);
let options = options.with_concurrent_target(concurrency_limit_subchunks);
let decode_chunk = |chunk_index: usize| {
let chunk_subset = self
.chunk_index_to_subset(chunk_index as u64, chunks_per_shard.as_slice())
.expect("inbounds chunk");
let output_subset_chunk = ArraySubset::new_with_start_shape(
std::iter::zip(
output_view.subset().start().iter(),
chunk_subset.start().iter(),
)
.map(|(o, s)| o + s)
.collect(),
chunk_subset.shape().to_vec(),
)
.unwrap();
let mut output_view_subchunk = unsafe {
output_view.subdivide(output_subset_chunk)?
};
let offset = shard_index[chunk_index * 2];
let size = shard_index[chunk_index * 2 + 1];
if offset == u64::MAX && size == u64::MAX {
output_view_subchunk.fill(fill_value.as_ne_bytes())?;
} else if usize::try_from(offset + size).unwrap() > encoded_shard.len() {
return Err(CodecError::Other(
"The shard index references out-of-bounds bytes. The chunk may be corrupted."
.to_string(),
));
} else {
let offset: usize = offset.try_into().unwrap();
let size: usize = size.try_into().unwrap();
let encoded_chunk = &encoded_shard[offset..offset + size];
self.inner_codecs.decode_into(
Cow::Borrowed(encoded_chunk),
&self.subchunk_shape,
data_type,
fill_value,
ArrayBytesDecodeIntoTarget::Fixed(&mut output_view_subchunk),
&options,
)?;
}
Ok::<_, CodecError>(())
};
crate::iter_concurrent_limit!(
shard_concurrent_limit,
(0..num_chunks),
try_for_each,
decode_chunk
)?;
Ok(())
}
fn partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(ShardingPartialDecoder::new(
input_handle,
data_type.clone(),
fill_value.clone(),
ChunkShape::from(shape.to_vec()),
self.subchunk_shape.clone(),
self.inner_codecs.clone(),
&self.index_codecs,
self.index_location,
options,
self.options.clone(),
)?))
}
#[cfg(feature = "async")]
async fn async_partial_decoder(
self: Arc<Self>,
input_handle: Arc<dyn AsyncBytesPartialDecoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError> {
Ok(Arc::new(
AsyncShardingPartialDecoder::new(
input_handle,
data_type.clone(),
fill_value.clone(),
ChunkShape::from(shape.to_vec()),
self.subchunk_shape.clone(),
self.inner_codecs.clone(),
&self.index_codecs,
self.index_location,
options,
self.options.clone(),
)
.await?,
))
}
fn partial_encoder(
self: Arc<Self>,
input_output_handle: Arc<dyn BytesPartialEncoderTraits>,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError> {
Ok(Arc::new(
sharding_partial_encoder::ShardingPartialEncoder::new(
input_output_handle,
data_type.clone(),
fill_value.clone(),
ChunkShape::from(shape.to_vec()),
self.subchunk_shape.clone(),
self.inner_codecs.clone(),
self.index_codecs.clone(),
self.index_location,
options,
self.options.clone(),
)?,
))
}
fn encoded_representation(
&self,
shape: &[NonZeroU64],
data_type: &DataType,
fill_value: &FillValue,
) -> Result<BytesRepresentation, CodecError> {
let chunk_bytes_representation = self
.inner_codecs
.encoded_representation(shape, data_type, fill_value)?;
match chunk_bytes_representation {
BytesRepresentation::BoundedSize(size) | BytesRepresentation::FixedSize(size) => {
let chunks_per_shard =
calculate_chunks_per_shard(shape, self.subchunk_shape.as_slice())?;
let index_encoded_size = compute_index_encoded_size(
self.index_codecs.as_ref(),
&sharding_index_shape(chunks_per_shard.as_slice()),
)?;
let shard_size = Self::encoded_shard_bounded_size(
index_encoded_size,
size,
chunks_per_shard.as_slice(),
);
Ok(BytesRepresentation::BoundedSize(shard_size))
}
BytesRepresentation::UnboundedSize => Ok(BytesRepresentation::UnboundedSize),
}
}
}
impl ShardingCodec {
fn chunk_index_to_subset(
&self,
chunk_index: u64,
chunks_per_shard: &[NonZeroU64],
) -> Option<ArraySubset> {
let chunks_per_shard = chunk_shape_to_array_shape(chunks_per_shard);
let chunk_indices = unravel_index(chunk_index, chunks_per_shard.as_slice())?;
let chunk_start = std::iter::zip(&chunk_indices, self.subchunk_shape.as_slice())
.map(|(i, c)| i * c.get())
.collect::<Vec<_>>();
let shape = self.subchunk_shape.as_slice();
let ranges = shape
.iter()
.zip(&chunk_start)
.map(|(&sh, &st)| st..(st + sh.get()));
Some(ArraySubset::from(ranges))
}
fn encoded_shard_bounded_size(
index_encoded_size: u64,
chunk_encoded_size: u64,
chunks_per_shard: &[NonZeroU64],
) -> u64 {
let num_chunks = chunks_per_shard.iter().map(|i| i.get()).product::<u64>();
num_chunks * chunk_encoded_size + index_encoded_size
}
#[expect(clippy::too_many_arguments)]
fn encode_inner_by_chunk_index(
&self,
chunk_index: usize,
decoded_value: &ArrayBytes,
shard_shape: &[NonZeroU64],
chunks_per_shard: &[NonZeroU64],
fill_value: &FillValue,
data_type: &DataType,
subchunk_shape: &[NonZeroU64],
options_inner: &CodecOptions,
) -> Option<Result<(usize, Vec<u8>), CodecError>> {
let chunk_subset = self
.chunk_index_to_subset(chunk_index as u64, chunks_per_shard)
.expect("inbounds chunk");
let bytes = decoded_value.extract_array_subset(
&chunk_subset,
bytemuck::must_cast_slice(shard_shape),
data_type,
);
let bytes = match bytes {
Ok(bytes) => bytes,
Err(err) => return Some(Err(err)),
};
let is_fill_value = bytes.is_fill_value(fill_value);
if is_fill_value {
None
} else {
let encoded_chunk = self.inner_codecs.encode(
bytes,
subchunk_shape,
data_type,
fill_value,
options_inner,
);
match encoded_chunk {
Ok(encoded_chunk) => Some(Ok((chunk_index, encoded_chunk.to_vec()))),
Err(err) => Some(Err(err)),
}
}
}
#[allow(clippy::too_many_lines)]
#[expect(clippy::too_many_arguments)]
fn encode_bounded(
&self,
decoded_value: &ArrayBytes,
data_type: &DataType,
fill_value: &FillValue,
shard_shape: &[NonZeroU64],
subchunk_shape: &[NonZeroU64],
chunk_size_bounded: u64,
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
decoded_value.validate(shard_shape.num_elements_u64(), data_type)?;
let chunks_per_shard = calculate_chunks_per_shard(shard_shape, subchunk_shape)?;
let index_shape = sharding_index_shape(chunks_per_shard.as_slice());
let index_encoded_size =
compute_index_encoded_size(self.index_codecs.as_ref(), &index_shape)?;
let shard_size_bounded = Self::encoded_shard_bounded_size(
index_encoded_size,
chunk_size_bounded,
chunks_per_shard.as_slice(),
);
let shard_size_bounded = usize::try_from(shard_size_bounded).unwrap();
let index_encoded_size = usize::try_from(index_encoded_size).unwrap();
let mut shard = Vec::with_capacity(shard_size_bounded);
let mut shard_index = vec![u64::MAX; index_shape.num_elements_usize()];
let encoded_shard_offset: usize = match self.index_location {
ShardingIndexLocation::Start => index_encoded_size,
ShardingIndexLocation::End => 0,
};
let (shard_concurrent_limit, concurrency_limit_subchunks) = calc_concurrency_outer_inner(
options.concurrent_target(),
&self.recommended_concurrency(shard_shape, data_type)?,
&self
.inner_codecs
.recommended_concurrency(subchunk_shape, data_type)?,
);
let options = options.with_concurrent_target(concurrency_limit_subchunks);
let n_chunks = chunks_per_shard
.as_slice()
.iter()
.map(|i| usize::try_from(i.get()).unwrap())
.product::<usize>();
let shard_slice = UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut shard);
let encoded_shard_offset = match self.options.subchunk_write_order() {
SubchunkWriteOrder::Random => {
let encoded_shard_offset_atomic: AtomicUsize = encoded_shard_offset.into();
let shard_index_slice = UnsafeCellSlice::new(&mut shard_index);
crate::iter_concurrent_limit!(
shard_concurrent_limit,
(0..n_chunks),
try_for_each,
|chunk_index: usize| {
let maybe_chunk_encoded_with_id = self.encode_inner_by_chunk_index(
chunk_index,
decoded_value,
shard_shape,
chunks_per_shard.as_slice(),
fill_value,
data_type,
subchunk_shape,
&options,
);
if let Some(chunk_encoded_with_id) = maybe_chunk_encoded_with_id {
let chunk_encoded = chunk_encoded_with_id?.1;
let chunk_offset = encoded_shard_offset_atomic.fetch_add(
chunk_encoded.len(),
std::sync::atomic::Ordering::Relaxed,
);
if chunk_offset + chunk_encoded.len() > shard_size_bounded {
return Err(CodecError::from(
"Sharding did not allocate a large enough buffer",
));
}
unsafe {
let shard_index_unsafe = shard_index_slice
.index_mut(chunk_index * 2..chunk_index * 2 + 2);
shard_index_unsafe[0] = u64::try_from(chunk_offset).unwrap();
shard_index_unsafe[1] = u64::try_from(chunk_encoded.len()).unwrap();
shard_slice
.index_mut(chunk_offset..chunk_offset + chunk_encoded.len())
.copy_from_slice(&chunk_encoded);
}
}
Ok(())
}
)?;
Ok::<_, CodecError>(
encoded_shard_offset_atomic.load(std::sync::atomic::Ordering::Relaxed),
)
}
SubchunkWriteOrder::C => {
let chunk_order = 0..n_chunks;
let encoded_chunk_ids_and_chunks: Vec<(usize, Vec<u8>)> =
crate::iter_concurrent_limit!(
shard_concurrent_limit,
chunk_order,
filter_map,
|chunk_index: usize| {
self.encode_inner_by_chunk_index(
chunk_index,
decoded_value,
shard_shape,
chunks_per_shard.as_slice(),
fill_value,
data_type,
subchunk_shape,
&options,
)
}
)
.collect::<Result<Vec<_>, _>>()?;
let total_offset = encoded_chunk_ids_and_chunks.iter().fold(
encoded_shard_offset,
|acc: usize, (i, chunk)| {
let chunk_len_usize = chunk.len();
let chunk_length = u64::try_from(chunk_len_usize).unwrap();
let chunk_offset = u64::try_from(acc).unwrap();
let shard_index_unsafe = shard_index.index_mut(i * 2..i * 2 + 2);
shard_index_unsafe[0] = chunk_offset;
shard_index_unsafe[1] = chunk_length;
acc + chunk_len_usize
},
);
crate::iter_concurrent_limit!(
shard_concurrent_limit,
encoded_chunk_ids_and_chunks,
for_each,
|(chunk_index, chunk): (usize, Vec<u8>)| {
unsafe {
let shard_index_loc =
&shard_index[chunk_index * 2..chunk_index * 2 + 2];
let chunk_offset = usize::try_from(shard_index_loc[0]).unwrap();
let chunk_encoded_len = usize::try_from(shard_index_loc[1]).unwrap();
shard_slice
.index_mut(chunk_offset..chunk_offset + chunk_encoded_len)
.copy_from_slice(&chunk);
}
}
);
Ok(total_offset)
}
}?;
let shard_length = encoded_shard_offset
+ match self.index_location {
ShardingIndexLocation::Start => 0,
ShardingIndexLocation::End => index_encoded_size,
};
let shard_index_bytes: ArrayBytesRaw = transmute_to_bytes_vec(shard_index).into();
let encoded_array_index = self.index_codecs.encode(
shard_index_bytes.into(),
&index_shape,
&crate::array::data_type::uint64(),
&FillValue::from(u64::MAX),
&options,
)?;
{
let shard_slice = unsafe { crate::vec_spare_capacity_to_mut_slice(&mut shard) };
match self.index_location {
ShardingIndexLocation::Start => {
shard_slice[..encoded_array_index.len()].copy_from_slice(&encoded_array_index);
}
ShardingIndexLocation::End => {
shard_slice[shard_length - encoded_array_index.len()..shard_length]
.copy_from_slice(&encoded_array_index);
}
}
}
unsafe { shard.set_len(shard_length) };
Ok(shard)
}
#[allow(clippy::too_many_lines)]
fn encode_unbounded(
&self,
decoded_value: &ArrayBytes,
data_type: &DataType,
fill_value: &FillValue,
shard_shape: &[NonZeroU64],
subchunk_shape: &[NonZeroU64],
options: &CodecOptions,
) -> Result<Vec<u8>, CodecError> {
decoded_value.validate(shard_shape.num_elements_u64(), data_type)?;
let chunks_per_shard = calculate_chunks_per_shard(shard_shape, subchunk_shape)?;
let index_shape = sharding_index_shape(chunks_per_shard.as_slice());
let index_encoded_size =
compute_index_encoded_size(self.index_codecs.as_ref(), &index_shape)?;
let index_encoded_size = usize::try_from(index_encoded_size).unwrap();
let n_chunks = chunks_per_shard
.as_slice()
.iter()
.map(|i| usize::try_from(i.get()).unwrap())
.product::<usize>();
let (shard_concurrent_limit, concurrency_limit_subchunks) = calc_concurrency_outer_inner(
options.concurrent_target(),
&self.recommended_concurrency(shard_shape, data_type)?,
&self
.inner_codecs
.recommended_concurrency(subchunk_shape, data_type)?,
);
let options_inner = options.with_concurrent_target(concurrency_limit_subchunks);
#[cfg(not(target_arch = "wasm32"))]
let iterator = match self.options.subchunk_write_order() {
SubchunkWriteOrder::Random | SubchunkWriteOrder::C => (0..n_chunks).into_par_iter(),
};
#[cfg(target_arch = "wasm32")]
let iterator = match self.options.subchunk_write_order() {
SubchunkWriteOrder::Random | SubchunkWriteOrder::C => 0..n_chunks,
};
let encoded_chunks: Vec<(usize, Vec<u8>)> = crate::iter_concurrent_limit!(
shard_concurrent_limit,
iterator,
filter_map,
|chunk_index| self.encode_inner_by_chunk_index(
chunk_index,
decoded_value,
shard_shape,
chunks_per_shard.as_slice(),
fill_value,
data_type,
subchunk_shape,
&options_inner
)
)
.collect::<Result<Vec<_>, _>>()?;
let encoded_chunk_length = encoded_chunks
.iter()
.map(|(_, bytes)| bytes.len())
.sum::<usize>();
let shard_length = encoded_chunk_length + index_encoded_size;
let mut shard = Vec::with_capacity(shard_length);
let mut shard_index = vec![u64::MAX; index_shape.num_elements_usize()];
let encoded_shard_offset = match self.index_location {
ShardingIndexLocation::Start => index_encoded_size,
ShardingIndexLocation::End => 0,
};
let shard_slice = UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut shard);
if !encoded_chunks.is_empty() {
match self.options.subchunk_write_order() {
SubchunkWriteOrder::Random => {
let encoded_shard_offset_atomic: AtomicUsize = encoded_shard_offset.into();
let shard_index_slice = UnsafeCellSlice::new(&mut shard_index);
crate::iter_concurrent_limit!(
options.concurrent_target(),
encoded_chunks,
for_each,
|(chunk_index, chunk_encoded): (usize, Vec<u8>)| {
let chunk_offset = encoded_shard_offset_atomic.fetch_add(
chunk_encoded.len(),
std::sync::atomic::Ordering::Relaxed,
);
unsafe {
let shard_index_unsafe = shard_index_slice
.index_mut(chunk_index * 2..chunk_index * 2 + 2);
shard_index_unsafe[0] = u64::try_from(chunk_offset).unwrap();
shard_index_unsafe[1] = u64::try_from(chunk_encoded.len()).unwrap();
shard_slice
.index_mut(chunk_offset..chunk_offset + chunk_encoded.len())
.copy_from_slice(&chunk_encoded);
}
}
);
}
SubchunkWriteOrder::C => {
let mut offset = encoded_shard_offset;
for (i, chunk) in &encoded_chunks {
let chunk_len_usize = chunk.len();
let chunk_length = u64::try_from(chunk_len_usize).unwrap();
let chunk_offset = u64::try_from(offset).unwrap();
let shard_index_unsafe = shard_index.index_mut(i * 2..i * 2 + 2);
shard_index_unsafe[0] = chunk_offset;
shard_index_unsafe[1] = chunk_length;
offset += chunk_len_usize;
}
crate::iter_concurrent_limit!(
options.concurrent_target(),
encoded_chunks,
for_each,
|(chunk_index, chunk): (usize, Vec<u8>)| {
unsafe {
let shard_index_loc =
&shard_index[chunk_index * 2..chunk_index * 2 + 2];
let chunk_offset = usize::try_from(shard_index_loc[0]).unwrap();
let chunk_encoded_len =
usize::try_from(shard_index_loc[1]).unwrap();
shard_slice
.index_mut(chunk_offset..chunk_offset + chunk_encoded_len)
.copy_from_slice(&chunk);
}
}
);
}
}
}
let encoded_array_index = self.index_codecs.encode(
ArrayBytes::from(transmute_to_bytes_vec(shard_index)),
&index_shape,
&crate::array::data_type::uint64(),
&FillValue::from(u64::MAX),
options,
)?;
{
let shard_slice = unsafe { crate::vec_spare_capacity_to_mut_slice(&mut shard) };
match self.index_location {
ShardingIndexLocation::Start => {
shard_slice[..encoded_array_index.len()].copy_from_slice(&encoded_array_index);
}
ShardingIndexLocation::End => {
shard_slice[shard_length - encoded_array_index.len()..]
.copy_from_slice(&encoded_array_index);
}
}
}
unsafe { shard.set_len(shard_length) };
Ok(shard)
}
pub fn decode_index(
&self,
encoded_shard: &[u8],
chunks_per_shard: &[NonZeroU64],
options: &CodecOptions,
) -> Result<Vec<u64>, CodecError> {
let index_shape = sharding_index_shape(chunks_per_shard);
let index_encoded_size =
compute_index_encoded_size(self.index_codecs.as_ref(), &index_shape)?;
if (encoded_shard.len() as u64) < index_encoded_size {
return Err(CodecError::Other(
"The encoded shard is smaller than the expected size of its index.".to_string(),
));
}
let encoded_shard_index = match self.index_location {
ShardingIndexLocation::Start => {
&encoded_shard[..index_encoded_size.try_into().unwrap()]
}
ShardingIndexLocation::End => {
let encoded_shard_offset =
usize::try_from(encoded_shard.len() as u64 - index_encoded_size).unwrap();
&encoded_shard[encoded_shard_offset..]
}
};
decode_shard_index(
encoded_shard_index,
&index_shape,
self.index_codecs.as_ref(),
options,
)
}
}