use crate::error::Result;
use crate::pipe_common::PipeState;
use crate::traits::{ChunkSource, SizedDimension};
use ndarray::{ArrayView, Dimension, StrideShape};
use std::marker::PhantomData;
use uuid::Uuid;
pub struct ChunkGuard<'a, A, D: SizedDimension + Dimension, M: Clone> {
data: ArrayView<'a, A, D::Larger>,
pipe_state: PipeState,
reader_id: Uuid,
chunk_source: &'a dyn ChunkSource<A, D, M>,
n_to_consume: usize,
}
impl<'a, A, D: SizedDimension + Dimension, M: Clone> ChunkGuard<'a, A, D, M> {
pub fn new(
data: ArrayView<'a, A, D::Larger>,
pipe_state: PipeState,
reader_id: Uuid,
chunk_source: &'a dyn ChunkSource<A, D, M>,
n_to_consume: usize,
) -> Self {
Self {
data,
pipe_state,
reader_id,
chunk_source,
n_to_consume,
}
}
pub fn data(&self) -> &ArrayView<'a, A, D::Larger> {
&self.data
}
pub fn pipe_state(&self) -> &PipeState {
&self.pipe_state
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn n_to_consume(&self) -> usize {
self.n_to_consume
}
}
impl<'a, A, D: SizedDimension + Dimension, M: Clone> Drop for ChunkGuard<'a, A, D, M> {
fn drop(&mut self) {
self.chunk_source
.advance_reader_ptr(self.reader_id, self.n_to_consume);
}
}
impl<'a, A, D: SizedDimension + Dimension, M: Clone> std::ops::Deref for ChunkGuard<'a, A, D, M> {
type Target = ArrayView<'a, A, D::Larger>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
pub struct PipeIterator<'a, S, A, D, M>
where
S: ChunkSource<A, D, M> + 'a,
A: Copy + 'a,
D: SizedDimension + Dimension + 'a,
M: Clone + 'a,
{
source: &'a S,
reader_id: Uuid,
n_to_read: usize,
n_to_consume: usize,
finished: bool,
_phantom: PhantomData<(A, D, M)>,
}
impl<'a, S, A, D, M> PipeIterator<'a, S, A, D, M>
where
S: ChunkSource<A, D, M> + 'a,
A: Copy + 'a,
D: SizedDimension + Dimension + 'a,
M: Clone + 'a,
{
pub fn new(source: &'a S, reader_id: Uuid, n_to_read: usize, n_to_consume: usize) -> Self {
Self {
source,
reader_id,
n_to_read,
n_to_consume,
finished: false,
_phantom: PhantomData,
}
}
pub fn is_finished(&self) -> bool {
self.finished
}
pub fn chunk_size(&self) -> usize {
self.n_to_read
}
pub fn consume_size(&self) -> usize {
self.n_to_consume
}
}
impl<'a, S, A, D, M> Iterator for PipeIterator<'a, S, A, D, M>
where
S: ChunkSource<A, D, M> + 'a,
A: Copy + 'a,
D: SizedDimension + Dimension + 'a,
M: Clone + 'a,
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
type Item = Result<ChunkGuard<'a, A, D, M>>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
match self
.source
.read_chunk_for_iterator(self.reader_id, self.n_to_read, self.n_to_consume)
{
Ok(chunk_guard) => Some(Ok(chunk_guard)),
Err(e) => {
if e.is_end_of_stream() || e.is_insufficient_data() {
self.finished = true;
None
} else {
Some(Err(e))
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::{PipeError, Result};
use crate::traits::ChunkSource;
use ndarray::{ArrayView, Ix0, StrideShape};
use std::cell::RefCell;
struct MockChunkSource {
data: Vec<f64>,
read_position: RefCell<usize>,
max_elements: usize,
}
impl MockChunkSource {
fn new(data: Vec<f64>) -> Self {
let max_elements = data.len();
Self {
data,
read_position: RefCell::new(0),
max_elements,
}
}
}
impl ChunkSource<f64, Ix0, ()> for MockChunkSource {
fn read_chunk_for_iterator<'a>(
&'a self,
_reader_id: Uuid,
n_to_read: usize,
n_to_consume: usize,
) -> Result<ChunkGuard<'a, f64, Ix0, ()>>
where
Ix0: SizedDimension + ndarray::Dimension,
<Ix0 as SizedDimension>::LargerSize:
Into<StrideShape<<Ix0 as ndarray::Dimension>::Larger>> + Clone,
<Ix0 as SizedDimension>::CurrentSize: Clone,
{
let mut pos = self.read_position.borrow_mut();
if *pos + n_to_read > self.max_elements {
return Err(PipeError::NotEnoughDataRemaining);
}
let slice = &self.data[*pos..*pos + n_to_read];
let array_view = ArrayView::<f64, <Ix0 as ndarray::Dimension>::Larger>::from_shape(
n_to_read, slice,
)?;
let pipe_state = PipeState {
write_ptr: self.max_elements,
read_ptr: *pos,
};
*pos += n_to_consume;
Ok(ChunkGuard::new(
array_view,
pipe_state,
_reader_id,
self,
n_to_consume,
))
}
fn get_reader_ptr(&self, _reader_id: Uuid) -> Option<usize> {
Some(*self.read_position.borrow())
}
fn advance_reader_ptr(&self, _reader_id: Uuid, _n_to_consume: usize) {
}
fn get_metadata(&self) -> Option<()> {
None
}
}
#[test]
fn test_unified_iterator_basic() {
let data: Vec<f64> = (0..10).map(|i| i as f64).collect();
let source = MockChunkSource::new(data);
let reader_id = Uuid::new_v4();
let mut iterator = PipeIterator::new(&source, reader_id, 3, 3);
let chunk1 = iterator.next().unwrap().unwrap();
assert_eq!(chunk1.len(), 3);
assert_eq!(chunk1[0], 0.0);
assert_eq!(chunk1[2], 2.0);
let chunk2 = iterator.next().unwrap().unwrap();
assert_eq!(chunk2.len(), 3);
assert_eq!(chunk2[0], 3.0);
assert_eq!(chunk2[2], 5.0);
let chunk3 = iterator.next().unwrap().unwrap();
assert_eq!(chunk3.len(), 3);
assert_eq!(chunk3[0], 6.0);
assert_eq!(chunk3[2], 8.0);
let chunk4 = iterator.next();
assert!(chunk4.is_none()); assert!(iterator.is_finished());
}
#[test]
fn test_unified_iterator_overlapping() {
let data: Vec<f64> = (0..8).map(|i| i as f64).collect();
let source = MockChunkSource::new(data);
let reader_id = Uuid::new_v4();
let mut iterator = PipeIterator::new(&source, reader_id, 4, 2);
let chunk1 = iterator.next().unwrap().unwrap();
assert_eq!(chunk1.len(), 4);
assert_eq!(chunk1[0], 0.0);
assert_eq!(chunk1[3], 3.0);
let chunk2 = iterator.next().unwrap().unwrap();
assert_eq!(chunk2.len(), 4);
assert_eq!(chunk2[0], 2.0);
assert_eq!(chunk2[3], 5.0);
let chunk3 = iterator.next().unwrap().unwrap();
assert_eq!(chunk3.len(), 4);
assert_eq!(chunk3[0], 4.0);
assert_eq!(chunk3[3], 7.0);
let chunk4 = iterator.next();
assert!(chunk4.is_none());
assert!(iterator.is_finished());
}
#[test]
fn test_iterator_properties() {
let data: Vec<f64> = vec![0.0; 10];
let source = MockChunkSource::new(data);
let reader_id = Uuid::new_v4();
let iterator = PipeIterator::new(&source, reader_id, 5, 3);
assert_eq!(iterator.chunk_size(), 5);
assert_eq!(iterator.consume_size(), 3);
assert!(!iterator.is_finished());
}
}