round_pipers 0.2.0

A way to pipe ndarrays using circular buffers
Documentation
use crate::array_helpers;
use crate::error::{PipeError, Result};
use crate::iterator_common::ChunkGuard;
use crate::pipe_common::{MetadataManager, PipeState, ReaderManager, ShapeManager};
use crate::traits::{ChunkSource, Readable, SizedDimension};
use ndarray::{ArrayView, Dimension, StrideShape};
use uuid::Uuid;

/// A read-only pipe that provides access to a pre-populated slice of data
/// Simple API: ReadOnlyPipe::new(my_slice) - no copying, uses lifetimes to ensure slice survives
pub struct ReadOnlyPipe<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> {
    data: &'data [A],
    nelements: usize,
    shape_manager: ShapeManager<D>,
    reader_manager: ReaderManager,
    metadata_manager: MetadataManager<M>,
}

impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> ReadOnlyPipe<'data, A, D, M> {
    /// Create a new read-only pipe from a slice
    /// The slice must outlive the ReadOnlyPipe (enforced by lifetimes)
    pub fn new<Sh: Into<StrideShape<D>>>(
        data: &'data [A],
        shape_input: Sh,
    ) -> Result<ReadOnlyPipe<'data, A, D, M>> {
        let shape_manager = ShapeManager::new(shape_input);
        let nelements = data.len() / shape_manager.element_size();

        Ok(ReadOnlyPipe {
            data,
            nelements,
            shape_manager,
            reader_manager: ReaderManager::new(),
            metadata_manager: MetadataManager::new(),
        })
    }

    /// Get a reader that starts at position 0
    pub fn get_reader(&self) -> ReadOnlyPipeReader<'_, A, D, M> {
        let reader = ReadOnlyPipeReader {
            id: Uuid::new_v4(),
            pipe: self,
        };
        // Readers start at 0 for read-only pipes
        self.reader_manager.register_reader(reader.id, 0);
        reader
    }

    pub fn get_metadata(&self) -> Option<M> {
        self.metadata_manager.get()
    }

    pub fn set_metadata(&self, m: M) {
        self.metadata_manager.set(m);
    }

    fn drop_reader(&self, reader: &ReadOnlyPipeReader<'data, A, D, M>) {
        self.reader_manager.unregister_reader(reader.id);
    }
}

/// Reader for read-only pipes - provides the same API as regular PipeReader
pub struct ReadOnlyPipeReader<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> {
    id: Uuid,
    pipe: &'data ReadOnlyPipe<'data, A, D, M>,
}

impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> ReadOnlyPipeReader<'data, A, D, M> {
    /// Create an iterator that yields ChunkGuard instances with RAII memory management
    /// Same API as regular PipeReader
    pub fn iter_chunks(
        &self,
        n_to_read: usize,
        n_to_consume: usize,
    ) -> crate::iterator_common::PipeIterator<ReadOnlyPipe<'data, A, D, M>, A, D, M> {
        crate::iterator_common::PipeIterator::new(self.pipe, self.id, n_to_read, n_to_consume)
    }
}

impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> Drop
    for ReadOnlyPipeReader<'data, A, D, M>
{
    fn drop(&mut self) {
        self.pipe.drop_reader(self);
    }
}

impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> Readable<A, D, M>
    for ReadOnlyPipeReader<'data, A, D, M>
{
    fn read<R>(
        &self,
        n_to_read: usize,
        n_to_consume: usize,
        f: impl FnOnce(ArrayView<A, D::Larger>, PipeState) -> R,
    ) -> Result<R>
    where
        D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
        D::CurrentSize: Clone,
    {
        // Get the current read pointer
        let read_ptr = self
            .pipe
            .reader_manager
            .get_reader_position(self.id)
            .ok_or(PipeError::ReadOnlyReaderNotRegistered { reader_id: self.id })?;

        // Validate bounds
        array_helpers::validate_bounds(read_ptr, n_to_read, self.pipe.nelements, "Read-only pipe")?;

        // Create the array view using common helper
        let data = array_helpers::create_read_view(
            self.pipe.data,
            read_ptr,
            n_to_read,
            &self.pipe.shape_manager,
        )?;

        let pipe_state = PipeState {
            write_ptr: self.pipe.nelements, // For read-only pipes, "write_ptr" is the end of data
            read_ptr,
        };

        let result = f(data, pipe_state);

        // Advance the read pointer
        self.pipe
            .reader_manager
            .advance_reader(self.id, n_to_consume);

        Ok(result)
    }

    fn get_metadata(&self) -> Option<M> {
        self.pipe.get_metadata()
    }
}

// Type alias for the read-only pipe iterator specialized for this pipe type
pub type ReadOnlyPipeIterator<'a, A, D, M> =
    crate::iterator_common::PipeIterator<'a, ReadOnlyPipe<'a, A, D, M>, A, D, M>;

impl<'data, A: Copy, D: SizedDimension + Dimension, M: Clone> ChunkSource<A, D, M>
    for ReadOnlyPipe<'data, A, D, M>
{
    fn read_chunk_for_iterator<'a>(
        &'a self,
        reader_id: Uuid,
        n_to_read: usize,
        n_to_consume: usize,
    ) -> Result<ChunkGuard<'a, A, D, M>>
    where
        D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
        D::CurrentSize: Clone,
    {
        // Get the current read pointer
        let read_ptr = self
            .reader_manager
            .get_reader_position(reader_id)
            .ok_or(PipeError::ReadOnlyReaderNotRegistered { reader_id })?;

        // Validate bounds using common helper
        array_helpers::validate_bounds(read_ptr, n_to_read, self.nelements, "Read-only pipe")?;

        // Create the array view using common helper
        let data =
            array_helpers::create_read_view(self.data, read_ptr, n_to_read, &self.shape_manager)?;

        let pipe_state = PipeState {
            write_ptr: self.nelements, // For read-only pipes, "write_ptr" is the end of data
            read_ptr,
        };

        Ok(ChunkGuard::new(
            data,
            pipe_state,
            reader_id,
            self,
            n_to_consume,
        ))
    }

    fn get_reader_ptr(&self, reader_id: Uuid) -> Option<usize> {
        self.reader_manager.get_reader_position(reader_id)
    }

    fn advance_reader_ptr(&self, reader_id: Uuid, n_to_consume: usize) {
        self.reader_manager.advance_reader(reader_id, n_to_consume);
    }

    fn get_metadata(&self) -> Option<M> {
        self.metadata_manager.get()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::error::Result;
    use ndarray::Ix0;

    #[test]
    fn test_readonly_pipe_basic() -> Result<()> {
        // Create some test data
        let data: Vec<f64> = (0..100).map(|i| i as f64).collect();

        // Create a read-only pipe - simple API as requested
        let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
        let reader = pipe.get_reader();

        // Test regular read API
        let mut values_read = Vec::new();
        reader.read(25, 25, |chunk, _state| {
            for &value in chunk.iter() {
                values_read.push(value);
            }
        })?;

        // Verify we read the first 25 values
        assert_eq!(values_read.len(), 25);
        for (i, &value) in values_read.iter().enumerate() {
            assert_eq!(value, i as f64);
        }

        // Test iterator API - same as regular pipe
        let mut iterator_values = Vec::new();
        for chunk_result in reader.iter_chunks(25, 25) {
            let chunk = chunk_result?;
            for &value in chunk.iter() {
                iterator_values.push(value);
            }
        }

        // Should read remaining 75 values (25-99)
        assert_eq!(iterator_values.len(), 75);
        for (i, &value) in iterator_values.iter().enumerate() {
            assert_eq!(value, (25 + i) as f64);
        }

        Ok(())
    }

    #[test]
    fn test_readonly_pipe_multiple_readers() -> Result<()> {
        let data: Vec<f64> = (0..50).map(|i| i as f64).collect();
        let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;

        // Create two readers - both should start at position 0
        let reader1 = pipe.get_reader();
        let reader2 = pipe.get_reader();

        // Reader 1 reads first 20 elements
        let mut reader1_values = Vec::new();
        reader1.read(20, 20, |chunk, _state| {
            for &value in chunk.iter() {
                reader1_values.push(value);
            }
        })?;

        // Reader 2 should still be able to read from the beginning
        let mut reader2_values = Vec::new();
        reader2.read(30, 30, |chunk, _state| {
            for &value in chunk.iter() {
                reader2_values.push(value);
            }
        })?;

        // Verify both readers got the expected data
        assert_eq!(reader1_values.len(), 20);
        assert_eq!(reader2_values.len(), 30);

        for (i, &value) in reader1_values.iter().enumerate() {
            assert_eq!(value, i as f64);
        }

        for (i, &value) in reader2_values.iter().enumerate() {
            assert_eq!(value, i as f64);
        }

        Ok(())
    }

    #[test]
    fn test_readonly_pipe_end_of_data() -> Result<()> {
        let data: Vec<f64> = (0..10).map(|i| i as f64).collect();
        let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
        let reader = pipe.get_reader();

        // Read all data
        reader.read(10, 10, |chunk, _state| {
            assert_eq!(chunk.len(), 10);
        })?;

        // Try to read more - should get an error
        let result = reader.read(5, 5, |_chunk, _state| {});
        assert!(result.is_err());
        assert!(result.unwrap_err().is_insufficient_data());

        Ok(())
    }

    #[test]
    fn test_insufficient_data_error() -> Result<()> {
        let data = vec![1.0, 2.0, 3.0];
        let pipe = ReadOnlyPipe::<f64, Ix0, ()>::new(&data, [])?;
        let reader = pipe.get_reader();

        // Try to read more data than available - should fail with insufficient data error
        match reader.read(10, 10, |_chunk, _state| {}) {
            Err(error) if error.is_insufficient_data() => {
                // Test passed - error correctly identified as insufficient data
            }
            other => panic!("Expected insufficient data error, got: {:?}", other),
        }

        // Also test that the specific error type is correct
        match reader.read(5, 5, |_chunk, _state| {}) {
            Err(PipeError::InsufficientData {
                context,
                requested,
                position,
                available,
            }) => {
                assert_eq!(context, "Read-only pipe");
                assert_eq!(requested, 5);
                assert_eq!(position, 0);
                assert_eq!(available, 3);
            }
            other => panic!("Expected InsufficientData error, got: {:?}", other),
        }

        Ok(())
    }
}