use anyhow::ensure;
use byte_slice_cast::{AsMutByteSlice, AsSliceOf, FromByteSlice, ToMutByteSlice};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use crate::extent::Extents;
use crate::filters::byteorder::{Order, ToNative};
pub trait Reader {
fn read_to(&mut self, extents: &Extents, dst: &mut [u8]) -> Result<usize, anyhow::Error>;
fn order(&self) -> Order;
fn dsize(&self) -> usize;
fn shape(&self) -> &[u64];
}
#[cfg(feature = "unstable")]
fn empty_vec<T>(vsz: usize) -> Vec<T>
where
T: Default,
{
let values = Box::<[T]>::new_uninit_slice(vsz);
let values = unsafe { values.assume_init() };
values.into_vec()
}
#[cfg(not(feature = "unstable"))]
fn empty_vec<T>(vsz: usize) -> Vec<T>
where
T: Default,
{
let mut values = Vec::with_capacity(vsz);
values.resize_with(vsz, T::default);
values
}
pub trait ReaderExt: Reader {
fn values_to<T, E>(&mut self, extents: E, dst: &mut [T]) -> Result<usize, anyhow::Error>
where
T: ToMutByteSlice,
[T]: ToNative,
E: TryInto<Extents>,
E::Error: Into<anyhow::Error>,
{
let extents = extents.try_into().map_err(|e| e.into())?;
let r = self.read_to(&extents, dst.as_mut_byte_slice())?;
dst.to_native(self.order());
Ok(r)
}
fn values<T, E>(&mut self, extents: E) -> Result<Vec<T>, anyhow::Error>
where
T: ToMutByteSlice + Default,
[T]: ToNative,
E: TryInto<Extents>,
E::Error: Into<anyhow::Error>,
{
let dsz = self.dsize();
let extents = extents.try_into().map_err(|e| e.into())?;
let counts = extents.get_counts(self.shape())?;
let vsz = counts.product::<u64>() as usize * dsz / std::mem::size_of::<T>();
ensure!(
dsz % std::mem::size_of::<T>() == 0,
"size of datatype ({}) not multiple of target {}",
dsz,
std::mem::size_of::<T>()
);
ensure!((dsz * vsz) % std::mem::align_of::<T>() == 0, "alignment of datatype ({}) not a multiple of datatype size and length {}*{}={}, alignment may not match and result in unsoundness", std::mem::align_of::<T>(), dsz, vsz, vsz * dsz);
let mut values = empty_vec(vsz);
self.values_to(extents, values.as_mut_slice())?;
Ok(values)
}
}
impl<T: ?Sized + Reader> ReaderExt for T {}
pub trait ParReader {
fn read_to_par(&self, extents: &Extents, dst: &mut [u8]) -> Result<usize, anyhow::Error>;
}
pub trait ParReaderExt: Reader + ParReader {
fn values_to_par<T, E>(&self, extents: E, dst: &mut [T]) -> Result<usize, anyhow::Error>
where
T: ToMutByteSlice,
[T]: ToNative,
E: TryInto<Extents>,
E::Error: Into<anyhow::Error>,
{
let extents = extents.try_into().map_err(|e| e.into())?;
let r = self.read_to_par(&extents, dst.as_mut_byte_slice())?;
dst.to_native(self.order());
Ok(r)
}
fn values_par<T, E>(&self, extents: E) -> Result<Vec<T>, anyhow::Error>
where
T: ToMutByteSlice + Default,
[T]: ToNative,
E: TryInto<Extents>,
E::Error: Into<anyhow::Error>,
{
let dsz = self.dsize();
let extents = extents.try_into().map_err(|e| e.into())?;
let counts = extents.get_counts(self.shape())?;
let vsz = counts.product::<u64>() as usize * dsz / std::mem::size_of::<T>();
ensure!(
dsz % std::mem::size_of::<T>() == 0,
"size of datatype ({}) not multiple of target {}",
dsz,
std::mem::size_of::<T>()
);
ensure!((dsz * vsz) % std::mem::align_of::<T>() == 0, "alignment of datatype ({}) not a multiple of datatype size and length {}*{}={}, alignment may not match and result in unsoundness", std::mem::align_of::<T>(), dsz, vsz, vsz * dsz);
let mut values = empty_vec(vsz);
self.values_to_par(extents, values.as_mut_slice())?;
Ok(values)
}
fn values_dyn_par<T, E>(&self, extents: E) -> Result<ndarray::ArrayD<T>, anyhow::Error>
where
T: ToMutByteSlice + Default,
[T]: ToNative,
E: TryInto<Extents>,
E::Error: Into<anyhow::Error>,
{
let dsz = self.dsize();
let extents = extents.try_into().map_err(|e| e.into())?;
let counts = extents.get_counts(self.shape())?;
let dims = counts.map(|d| d as usize).collect::<Vec<_>>();
let vsz = dims.iter().product::<usize>() * dsz / std::mem::size_of::<T>();
ensure!(
dsz % std::mem::size_of::<T>() == 0,
"size of datatype ({}) not multiple of target {}",
dsz,
std::mem::size_of::<T>()
);
ensure!((dsz * vsz) % std::mem::align_of::<T>() == 0, "alignment of datatype ({}) not a multiple of datatype size and length {}*{}={}, alignment may not match and result in unsoundness", std::mem::align_of::<T>(), dsz, vsz, vsz * dsz);
let mut a = unsafe { ndarray::ArrayD::<T>::uninit(dims).assume_init() };
let dst = a.as_slice_mut().unwrap();
self.values_to_par(extents, dst)?;
Ok(a)
}
}
impl<T: ?Sized + Reader + ParReader> ParReaderExt for T {}
pub trait Streamer {
fn stream(
&self,
extents: &Extents,
) -> Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send + 'static>>;
fn stream_xdr(
&self,
extents: &Extents,
) -> Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send + 'static>>;
fn order(&self) -> Order;
fn dsize(&self) -> usize;
}
pub trait StreamerExt: Streamer {
fn stream_values<T, E>(
&self,
extents: E,
) -> Pin<Box<dyn Stream<Item = Result<Vec<T>, anyhow::Error>> + Send + 'static>>
where
T: Unpin + Send + FromByteSlice + Clone,
[T]: ToNative,
E: TryInto<Extents>,
E::Error: Into<anyhow::Error>,
{
let order = self.order();
let extents = extents.try_into().map_err(|e| e.into()).unwrap();
Box::pin(self.stream(&extents).map(move |b| {
let b = b?;
let values = b.as_slice_of::<T>()?;
let mut values = values.to_vec();
values.to_native(order);
Ok(values)
}))
}
}
impl<T: ?Sized + Streamer> StreamerExt for T {}