round_pipers 0.2.0

A way to pipe ndarrays using circular buffers
Documentation
//! Common iterator infrastructure for all readable pipes
//!
//! This module provides a unified iterator implementation that works with any ChunkSource,
//! eliminating the duplication between ReadOnlyPipeIterator and PipeIterator.

use crate::error::Result;
use crate::pipe_common::PipeState;
use crate::traits::{ChunkSource, SizedDimension};
use ndarray::{ArrayView, Dimension, StrideShape};
use std::marker::PhantomData;
use uuid::Uuid;

/// RAII guard that holds an ArrayView with managed lifetime
pub struct ChunkGuard<'a, A, D: SizedDimension + Dimension, M: Clone> {
    data: ArrayView<'a, A, D::Larger>,
    pipe_state: PipeState,
    reader_id: Uuid,
    chunk_source: &'a dyn ChunkSource<A, D, M>,
    n_to_consume: usize,
}

impl<'a, A, D: SizedDimension + Dimension, M: Clone> ChunkGuard<'a, A, D, M> {
    pub fn new(
        data: ArrayView<'a, A, D::Larger>,
        pipe_state: PipeState,
        reader_id: Uuid,
        chunk_source: &'a dyn ChunkSource<A, D, M>,
        n_to_consume: usize,
    ) -> Self {
        Self {
            data,
            pipe_state,
            reader_id,
            chunk_source,
            n_to_consume,
        }
    }

    /// Get the data array view
    pub fn data(&self) -> &ArrayView<'a, A, D::Larger> {
        &self.data
    }

    /// Get the pipe state when this chunk was read
    pub fn pipe_state(&self) -> &PipeState {
        &self.pipe_state
    }

    /// Get the size of this chunk (n_to_read)
    pub fn len(&self) -> usize {
        self.data.len()
    }

    /// Check if this chunk is empty
    pub fn is_empty(&self) -> bool {
        self.data.is_empty()
    }

    /// Get how much will be consumed when this chunk is dropped
    pub fn n_to_consume(&self) -> usize {
        self.n_to_consume
    }
}

impl<'a, A, D: SizedDimension + Dimension, M: Clone> Drop for ChunkGuard<'a, A, D, M> {
    fn drop(&mut self) {
        // Advance the read pointer by n_to_consume when the chunk is dropped
        self.chunk_source
            .advance_reader_ptr(self.reader_id, self.n_to_consume);
    }
}

impl<'a, A, D: SizedDimension + Dimension, M: Clone> std::ops::Deref for ChunkGuard<'a, A, D, M> {
    type Target = ArrayView<'a, A, D::Larger>;

    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

/// Iterator for any type that implements ChunkSource
pub struct PipeIterator<'a, S, A, D, M>
where
    S: ChunkSource<A, D, M> + 'a,
    A: Copy + 'a,
    D: SizedDimension + Dimension + 'a,
    M: Clone + 'a,
{
    source: &'a S,
    reader_id: Uuid,
    n_to_read: usize,
    n_to_consume: usize,
    finished: bool,
    _phantom: PhantomData<(A, D, M)>,
}

impl<'a, S, A, D, M> PipeIterator<'a, S, A, D, M>
where
    S: ChunkSource<A, D, M> + 'a,
    A: Copy + 'a,
    D: SizedDimension + Dimension + 'a,
    M: Clone + 'a,
{
    /// Create a new unified iterator
    pub fn new(source: &'a S, reader_id: Uuid, n_to_read: usize, n_to_consume: usize) -> Self {
        Self {
            source,
            reader_id,
            n_to_read,
            n_to_consume,
            finished: false,
            _phantom: PhantomData,
        }
    }

    /// Check if the iterator has finished
    pub fn is_finished(&self) -> bool {
        self.finished
    }

    /// Get the number of elements this iterator reads per chunk
    pub fn chunk_size(&self) -> usize {
        self.n_to_read
    }

    /// Get the number of elements this iterator consumes per chunk
    pub fn consume_size(&self) -> usize {
        self.n_to_consume
    }
}

impl<'a, S, A, D, M> Iterator for PipeIterator<'a, S, A, D, M>
where
    S: ChunkSource<A, D, M> + 'a,
    A: Copy + 'a,
    D: SizedDimension + Dimension + 'a,
    M: Clone + 'a,
    D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
    D::CurrentSize: Clone,
{
    type Item = Result<ChunkGuard<'a, A, D, M>>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.finished {
            return None;
        }

        // Try to read a chunk using the ChunkSource trait
        match self
            .source
            .read_chunk_for_iterator(self.reader_id, self.n_to_read, self.n_to_consume)
        {
            Ok(chunk_guard) => Some(Ok(chunk_guard)),
            Err(e) => {
                // Check for end-of-data conditions specific to different pipe types
                if e.is_end_of_stream() || e.is_insufficient_data() {
                    self.finished = true;
                    None
                } else {
                    // Return other errors (like reader not registered, etc.)
                    Some(Err(e))
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::error::{PipeError, Result};
    use crate::traits::ChunkSource;
    use ndarray::{ArrayView, Ix0, StrideShape};
    use std::cell::RefCell;

    // Mock ChunkSource for testing
    struct MockChunkSource {
        data: Vec<f64>,
        read_position: RefCell<usize>,
        max_elements: usize,
    }

    impl MockChunkSource {
        fn new(data: Vec<f64>) -> Self {
            let max_elements = data.len();
            Self {
                data,
                read_position: RefCell::new(0),
                max_elements,
            }
        }
    }

    impl ChunkSource<f64, Ix0, ()> for MockChunkSource {
        fn read_chunk_for_iterator<'a>(
            &'a self,
            _reader_id: Uuid,
            n_to_read: usize,
            n_to_consume: usize,
        ) -> Result<ChunkGuard<'a, f64, Ix0, ()>>
        where
            Ix0: SizedDimension + ndarray::Dimension,
            <Ix0 as SizedDimension>::LargerSize:
                Into<StrideShape<<Ix0 as ndarray::Dimension>::Larger>> + Clone,
            <Ix0 as SizedDimension>::CurrentSize: Clone,
        {
            let mut pos = self.read_position.borrow_mut();

            if *pos + n_to_read > self.max_elements {
                return Err(PipeError::NotEnoughDataRemaining);
            }

            let slice = &self.data[*pos..*pos + n_to_read];
            let array_view = ArrayView::<f64, <Ix0 as ndarray::Dimension>::Larger>::from_shape(
                n_to_read, slice,
            )?;

            let pipe_state = PipeState {
                write_ptr: self.max_elements,
                read_ptr: *pos,
            };

            *pos += n_to_consume;

            Ok(ChunkGuard::new(
                array_view,
                pipe_state,
                _reader_id,
                self,
                n_to_consume,
            ))
        }

        fn get_reader_ptr(&self, _reader_id: Uuid) -> Option<usize> {
            Some(*self.read_position.borrow())
        }

        fn advance_reader_ptr(&self, _reader_id: Uuid, _n_to_consume: usize) {
            // Note: In real implementation, this would be called by ChunkGuard::drop
            // For testing, we advance in read_chunk_for_iterator
        }

        fn get_metadata(&self) -> Option<()> {
            None
        }
    }

    #[test]
    fn test_unified_iterator_basic() {
        let data: Vec<f64> = (0..10).map(|i| i as f64).collect();
        let source = MockChunkSource::new(data);
        let reader_id = Uuid::new_v4();

        let mut iterator = PipeIterator::new(&source, reader_id, 3, 3);

        // First chunk: elements 0, 1, 2
        let chunk1 = iterator.next().unwrap().unwrap();
        assert_eq!(chunk1.len(), 3);
        assert_eq!(chunk1[0], 0.0);
        assert_eq!(chunk1[2], 2.0);

        // Second chunk: elements 3, 4, 5
        let chunk2 = iterator.next().unwrap().unwrap();
        assert_eq!(chunk2.len(), 3);
        assert_eq!(chunk2[0], 3.0);
        assert_eq!(chunk2[2], 5.0);

        // Third chunk: elements 6, 7, 8
        let chunk3 = iterator.next().unwrap().unwrap();
        assert_eq!(chunk3.len(), 3);
        assert_eq!(chunk3[0], 6.0);
        assert_eq!(chunk3[2], 8.0);

        // Fourth attempt: only 1 element left (9), but requesting 3
        let chunk4 = iterator.next();
        assert!(chunk4.is_none()); // Iterator should end gracefully
        assert!(iterator.is_finished());
    }

    #[test]
    fn test_unified_iterator_overlapping() {
        let data: Vec<f64> = (0..8).map(|i| i as f64).collect();
        let source = MockChunkSource::new(data);
        let reader_id = Uuid::new_v4();

        // Read 4 elements, but only consume 2 (50% overlap)
        let mut iterator = PipeIterator::new(&source, reader_id, 4, 2);

        // First chunk: elements 0, 1, 2, 3 (consume 2, so next starts at 2)
        let chunk1 = iterator.next().unwrap().unwrap();
        assert_eq!(chunk1.len(), 4);
        assert_eq!(chunk1[0], 0.0);
        assert_eq!(chunk1[3], 3.0);

        // Second chunk: elements 2, 3, 4, 5 (consume 2, so next starts at 4)
        let chunk2 = iterator.next().unwrap().unwrap();
        assert_eq!(chunk2.len(), 4);
        assert_eq!(chunk2[0], 2.0);
        assert_eq!(chunk2[3], 5.0);

        // Third chunk: elements 4, 5, 6, 7 (consume 2, so next starts at 6)
        let chunk3 = iterator.next().unwrap().unwrap();
        assert_eq!(chunk3.len(), 4);
        assert_eq!(chunk3[0], 4.0);
        assert_eq!(chunk3[3], 7.0);

        // Fourth attempt: would start at 6, but only have elements 6, 7 (need 4)
        let chunk4 = iterator.next();
        assert!(chunk4.is_none());
        assert!(iterator.is_finished());
    }

    #[test]
    fn test_iterator_properties() {
        let data: Vec<f64> = vec![0.0; 10];
        let source = MockChunkSource::new(data);
        let reader_id = Uuid::new_v4();

        let iterator = PipeIterator::new(&source, reader_id, 5, 3);

        assert_eq!(iterator.chunk_size(), 5);
        assert_eq!(iterator.consume_size(), 3);
        assert!(!iterator.is_finished());
    }
}