use crate::array_helpers;
use crate::error::{PipeError, Result};
use crate::iterator_common::ChunkGuard;
use crate::pipe_common::{MetadataManager, PipeState, ReaderManager, ShapeManager};
use crate::traits::{ChunkSource, Readable, SizedDimension};
use ndarray::{ArrayView, Dimension, StrideShape};
use uuid::Uuid;
pub struct ReadOnlyPipe<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> {
data: &'data [A],
nelements: usize,
shape_manager: ShapeManager<D>,
reader_manager: ReaderManager,
metadata_manager: MetadataManager<M>,
}
impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> ReadOnlyPipe<'data, A, D, M> {
pub fn new<Sh: Into<StrideShape<D>>>(
data: &'data [A],
shape_input: Sh,
) -> Result<ReadOnlyPipe<'data, A, D, M>> {
let shape_manager = ShapeManager::new(shape_input);
let nelements = data.len() / shape_manager.element_size();
Ok(ReadOnlyPipe {
data,
nelements,
shape_manager,
reader_manager: ReaderManager::new(),
metadata_manager: MetadataManager::new(),
})
}
pub fn get_reader(&self) -> ReadOnlyPipeReader<'_, A, D, M> {
let reader = ReadOnlyPipeReader {
id: Uuid::new_v4(),
pipe: self,
};
self.reader_manager.register_reader(reader.id, 0);
reader
}
pub fn get_metadata(&self) -> Option<M> {
self.metadata_manager.get()
}
pub fn set_metadata(&self, m: M) {
self.metadata_manager.set(m);
}
fn drop_reader(&self, reader: &ReadOnlyPipeReader<'data, A, D, M>) {
self.reader_manager.unregister_reader(reader.id);
}
}
pub struct ReadOnlyPipeReader<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> {
id: Uuid,
pipe: &'data ReadOnlyPipe<'data, A, D, M>,
}
impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> ReadOnlyPipeReader<'data, A, D, M> {
pub fn iter_chunks(
&self,
n_to_read: usize,
n_to_consume: usize,
) -> crate::iterator_common::PipeIterator<ReadOnlyPipe<'data, A, D, M>, A, D, M> {
crate::iterator_common::PipeIterator::new(self.pipe, self.id, n_to_read, n_to_consume)
}
}
impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> Drop
for ReadOnlyPipeReader<'data, A, D, M>
{
fn drop(&mut self) {
self.pipe.drop_reader(self);
}
}
impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> Readable<A, D, M>
for ReadOnlyPipeReader<'data, A, D, M>
{
fn read<R>(
&self,
n_to_read: usize,
n_to_consume: usize,
f: impl FnOnce(ArrayView<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
let read_ptr = self
.pipe
.reader_manager
.get_reader_position(self.id)
.ok_or(PipeError::ReadOnlyReaderNotRegistered { reader_id: self.id })?;
array_helpers::validate_bounds(read_ptr, n_to_read, self.pipe.nelements, "Read-only pipe")?;
let data = array_helpers::create_read_view(
self.pipe.data,
read_ptr,
n_to_read,
&self.pipe.shape_manager,
)?;
let pipe_state = PipeState {
write_ptr: self.pipe.nelements, read_ptr,
};
let result = f(data, pipe_state);
self.pipe
.reader_manager
.advance_reader(self.id, n_to_consume);
Ok(result)
}
fn get_metadata(&self) -> Option<M> {
self.pipe.get_metadata()
}
}
pub type ReadOnlyPipeIterator<'a, A, D, M> =
crate::iterator_common::PipeIterator<'a, ReadOnlyPipe<'a, A, D, M>, A, D, M>;
impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> ChunkSource<A, D, M>
for ReadOnlyPipe<'data, A, D, M>
{
fn read_chunk_for_iterator<'a>(
&'a self,
reader_id: Uuid,
n_to_read: usize,
n_to_consume: usize,
) -> Result<ChunkGuard<'a, A, D, M>>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
let read_ptr = self
.reader_manager
.get_reader_position(reader_id)
.ok_or(PipeError::ReadOnlyReaderNotRegistered { reader_id })?;
array_helpers::validate_bounds(read_ptr, n_to_read, self.nelements, "Read-only pipe")?;
let data =
array_helpers::create_read_view(self.data, read_ptr, n_to_read, &self.shape_manager)?;
let pipe_state = PipeState {
write_ptr: self.nelements, read_ptr,
};
Ok(ChunkGuard::new(
data,
pipe_state,
reader_id,
self,
n_to_consume,
))
}
fn get_reader_ptr(&self, reader_id: Uuid) -> Option<usize> {
self.reader_manager.get_reader_position(reader_id)
}
fn advance_reader_ptr(&self, reader_id: Uuid, n_to_consume: usize) {
self.reader_manager.advance_reader(reader_id, n_to_consume);
}
fn get_metadata(&self) -> Option<M> {
self.metadata_manager.get()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
use ndarray::Ix0;
#[test]
fn test_readonly_pipe_basic() -> Result<()> {
let data: Vec<f64> = (0..100).map(|i| i as f64).collect();
let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
let reader = pipe.get_reader();
let mut values_read = Vec::new();
reader.read(25, 25, |chunk, _state| {
for &value in chunk.iter() {
values_read.push(value);
}
})?;
assert_eq!(values_read.len(), 25);
for (i, &value) in values_read.iter().enumerate() {
assert_eq!(value, i as f64);
}
let mut iterator_values = Vec::new();
for chunk_result in reader.iter_chunks(25, 25) {
let chunk = chunk_result?;
for &value in chunk.iter() {
iterator_values.push(value);
}
}
assert_eq!(iterator_values.len(), 75);
for (i, &value) in iterator_values.iter().enumerate() {
assert_eq!(value, (25 + i) as f64);
}
Ok(())
}
#[test]
fn test_readonly_pipe_multiple_readers() -> Result<()> {
let data: Vec<f64> = (0..50).map(|i| i as f64).collect();
let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
let reader1 = pipe.get_reader();
let reader2 = pipe.get_reader();
let mut reader1_values = Vec::new();
reader1.read(20, 20, |chunk, _state| {
for &value in chunk.iter() {
reader1_values.push(value);
}
})?;
let mut reader2_values = Vec::new();
reader2.read(30, 30, |chunk, _state| {
for &value in chunk.iter() {
reader2_values.push(value);
}
})?;
assert_eq!(reader1_values.len(), 20);
assert_eq!(reader2_values.len(), 30);
for (i, &value) in reader1_values.iter().enumerate() {
assert_eq!(value, i as f64);
}
for (i, &value) in reader2_values.iter().enumerate() {
assert_eq!(value, i as f64);
}
Ok(())
}
#[test]
fn test_readonly_pipe_end_of_data() -> Result<()> {
let data: Vec<f64> = (0..10).map(|i| i as f64).collect();
let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
let reader = pipe.get_reader();
reader.read(10, 10, |chunk, _state| {
assert_eq!(chunk.len(), 10);
})?;
let result = reader.read(5, 5, |_chunk, _state| {});
assert!(result.is_err());
assert!(result.unwrap_err().is_insufficient_data());
Ok(())
}
#[test]
fn test_insufficient_data_error() -> Result<()> {
let data = vec![1.0, 2.0, 3.0];
let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
let reader = pipe.get_reader();
match reader.read(10, 10, |_chunk, _state| {}) {
Err(error) if error.is_insufficient_data() => {
}
other => panic!("Expected insufficient data error, got: {:?}", other),
}
match reader.read(5, 5, |_chunk, _state| {}) {
Err(PipeError::InsufficientData {
context,
requested,
position,
available,
}) => {
assert_eq!(context, "Read-only pipe");
assert_eq!(requested, 5);
assert_eq!(position, 0);
assert_eq!(available, 3);
}
other => panic!("Expected InsufficientData error, got: {:?}", other),
}
Ok(())
}
}