use crate::array_helpers;
use crate::buffer::CircularBuffer;
use crate::error::{PipeError, Result};
use crate::iterator_common::ChunkGuard;
use crate::pipe_common::{MetadataManager, PipeState, ReaderManager, ShapeManager};
use crate::traits::{ChunkSource, Readable, SizedDimension, Writable};
use bytemuck::Zeroable;
use ndarray::{ArrayView, ArrayViewMut, Dimension, StrideShape};
use std::marker::PhantomData;
use std::mem::size_of;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use uuid::Uuid;
const _: () = {
fn assert_send_sync<T: Send + Sync>() {}
fn check_managers<D: SizedDimension + Dimension, M: Clone + Send + Sync>()
where
D::CurrentSize: Send + Sync,
{
assert_send_sync::<ShapeManager<D>>();
assert_send_sync::<ReaderManager>();
assert_send_sync::<MetadataManager<M>>();
}
};
unsafe impl<A: Copy + Send + Zeroable, D: SizedDimension + Dimension, M: Clone + Send + Sync> Send
for Pipe<A, D, M>
where
D::CurrentSize: Send + Sync,
{
}
unsafe impl<A: Copy + Send + Zeroable, D: SizedDimension + Dimension, M: Clone + Send + Sync> Sync
for Pipe<A, D, M>
where
D::CurrentSize: Send + Sync,
{
}
pub struct Pipe<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> {
_phantom_type: PhantomData<A>,
name: String,
buffer: CircularBuffer,
shape_manager: ShapeManager<D>,
reader_manager: ReaderManager,
write_ptr: AtomicUsize,
returned_writer: AtomicBool,
writer_dropped: AtomicBool,
metadata_manager: MetadataManager<M>,
}
pub struct PipeReader<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> {
id: Uuid,
pipe: Arc<Pipe<A, D, M>>,
}
pub struct PipeWriter<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> {
pipe: Arc<Pipe<A, D, M>>,
}
pub type PipeIterator<'a, A, D, M> =
crate::iterator_common::PipeIterator<'a, Pipe<A, D, M>, A, D, M>;
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> Pipe<A, D, M> {
pub fn new<Sh: Into<StrideShape<D>>>(
name: impl AsRef<Path>,
nelements: usize,
shape_input: Sh,
) -> Result<Pipe<A, D, M>> {
let shape_manager = ShapeManager::new(shape_input);
let buffer_size_bytes = nelements * shape_manager.element_size() * size_of::<A>();
Ok(Pipe {
name: name.as_ref().to_str().unwrap().to_string(),
buffer: CircularBuffer::new(name, buffer_size_bytes)?,
shape_manager,
reader_manager: ReaderManager::new(),
write_ptr: AtomicUsize::new(0),
returned_writer: AtomicBool::new(false),
writer_dropped: AtomicBool::new(false),
metadata_manager: MetadataManager::new(),
_phantom_type: PhantomData,
})
}
fn nelements(&self) -> usize {
let element_size_bytes = self.shape_manager.element_size() * size_of::<A>();
self.buffer.size_bytes() / element_size_bytes
}
fn get_metadata(&self) -> Option<M> {
self.metadata_manager.get()
}
fn set_metadata(&self, m: M) {
self.metadata_manager.set(m);
}
fn drop_writer(&self) {
self.writer_dropped.store(true, Ordering::Relaxed);
}
fn drop_reader(&self, reader: &PipeReader<A, D, M>) {
self.reader_manager.unregister_reader(reader.id);
}
pub fn get_reader(self: Arc<Self>) -> PipeReader<A, D, M> {
let rdr = PipeReader {
id: Uuid::new_v4(),
pipe: Arc::clone(&self),
};
self.reader_manager
.register_reader(rdr.id, self.write_ptr.load(Ordering::Relaxed));
rdr
}
pub fn get_writer(self: Arc<Self>) -> Result<PipeWriter<A, D, M>> {
if self.returned_writer.load(Ordering::Acquire) {
return Err(PipeError::WriterAlreadyAcquired {
pipe_name: self.name.clone(),
});
}
let writer = PipeWriter {
pipe: Arc::clone(&self),
};
self.returned_writer.store(true, Ordering::Release);
Ok(writer)
}
fn write<R>(
&self,
n_to_write: usize,
f: impl FnOnce(ArrayViewMut<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
if self.writer_dropped.load(Ordering::Relaxed) {
panic!("On pipe: {} The one writer returned for {} dropped. {} shouldn't be written to any more.", self.name, self.name, self.name)
}
let nelements = self.nelements();
if n_to_write > nelements {
return Err(PipeError::WriteCapacityExceeded {
pipe_name: self.name.clone(),
requested: n_to_write,
capacity: nelements,
});
}
let mut write_ptr = self.write_ptr.load(Ordering::Acquire);
let write_ptr_mod = write_ptr % nelements;
let buffer_slice = &mut self.buffer.view_mut::<A>()?;
let data = array_helpers::create_write_view(
buffer_slice,
write_ptr_mod,
n_to_write,
&self.shape_manager,
)?;
let mut last_duration = Duration::new(0, 1);
let mut this_duration = Duration::new(0, 1);
let mut min_read_ptr;
loop {
min_read_ptr = self.reader_manager.get_min_distance_from(write_ptr);
let capacity = nelements - min_read_ptr.unwrap_or(0);
if capacity > n_to_write {
break;
}
sleep(this_duration);
let tmp = last_duration;
last_duration = this_duration;
this_duration = last_duration + tmp;
}
let r = f(
data,
PipeState {
write_ptr,
read_ptr: min_read_ptr.unwrap_or(write_ptr),
},
);
write_ptr += n_to_write;
self.write_ptr.store(write_ptr, Ordering::Release);
Ok(r)
}
fn read<R>(
&self,
reader: &PipeReader<A, D, M>,
n_to_read: usize,
n_to_consume: usize,
f: impl FnOnce(ArrayView<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
let nelements = self.nelements();
if n_to_read > nelements {
return Err(PipeError::ReadCapacityExceeded {
pipe_name: self.name.clone(),
requested: n_to_read,
capacity: nelements,
});
}
let read_ptr = self.reader_manager.get_reader_position(reader.id).ok_or(
PipeError::ReaderNotRegistered {
reader_id: reader.id,
pipe_name: self.name.clone(),
},
)?;
let mut last_duration = Duration::new(0, 1);
let mut this_duration = Duration::new(0, 1);
loop {
if read_ptr + n_to_read <= self.write_ptr.load(Ordering::Relaxed) {
break;
}
if self.writer_dropped.load(Ordering::Relaxed) {
return Err(PipeError::WriterDroppedInsufficientData {
pipe_name: self.name.clone(),
requested: n_to_read,
});
}
sleep(this_duration);
let tmp = last_duration;
last_duration = this_duration;
this_duration = last_duration + tmp;
}
let buffer_slice = &self.buffer.view::<A>()?;
let data = array_helpers::create_read_view(
buffer_slice,
read_ptr,
n_to_read,
&self.shape_manager,
)?;
let r = f(
data,
PipeState {
write_ptr: self.write_ptr.load(Ordering::Relaxed),
read_ptr,
},
);
self.reader_manager.advance_reader(reader.id, n_to_consume);
Ok(r)
}
}
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> ChunkSource<A, D, M>
for Pipe<A, D, M>
{
fn read_chunk_for_iterator<'a>(
&'a self,
reader_id: Uuid,
n_to_read: usize,
n_to_consume: usize,
) -> Result<ChunkGuard<'a, A, D, M>>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
let nelements = self.nelements();
if n_to_read > nelements {
return Err(PipeError::ReadCapacityExceeded {
pipe_name: self.name.clone(),
requested: n_to_read,
capacity: nelements,
});
}
let mut last_duration = Duration::new(0, 1);
let mut this_duration = Duration::new(0, 1);
let max_sleep_duration = Duration::new(1, 0);
let read_ptr = loop {
let current_ptr = self.reader_manager.get_reader_position(reader_id).ok_or(
PipeError::ReaderNotRegistered {
reader_id,
pipe_name: self.name.clone(),
},
)?;
if current_ptr + n_to_read <= self.write_ptr.load(Ordering::Relaxed) {
break current_ptr;
}
if self.writer_dropped.load(Ordering::Relaxed) {
return Err(PipeError::WriterDroppedInsufficientData {
pipe_name: self.name.clone(),
requested: n_to_read,
});
}
sleep(this_duration);
let tmp = last_duration;
last_duration = this_duration;
this_duration = (last_duration + tmp).min(max_sleep_duration);
};
let buffer_slice = &self.buffer.view::<A>()?;
let data = array_helpers::create_read_view(
buffer_slice,
read_ptr,
n_to_read,
&self.shape_manager,
)?;
let pipe_state = PipeState {
write_ptr: self.write_ptr.load(Ordering::Relaxed),
read_ptr,
};
Ok(ChunkGuard::new(
data,
pipe_state,
reader_id,
self,
n_to_consume,
))
}
fn get_reader_ptr(&self, reader_id: Uuid) -> Option<usize> {
self.reader_manager.get_reader_position(reader_id)
}
fn advance_reader_ptr(&self, reader_id: Uuid, n_to_consume: usize) {
self.reader_manager.advance_reader(reader_id, n_to_consume);
}
fn get_metadata(&self) -> Option<M> {
self.metadata_manager.get()
}
}
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> Readable<A, D, M>
for PipeReader<A, D, M>
{
fn read<R>(
&self,
n_to_read: usize,
n_to_consume: usize,
f: impl FnOnce(ArrayView<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
self.pipe.read(self, n_to_read, n_to_consume, f)
}
fn get_metadata(&self) -> Option<M> {
self.pipe.get_metadata()
}
}
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> PipeReader<A, D, M> {
pub fn iter_chunks(&self, n_to_read: usize, n_to_consume: usize) -> PipeIterator<A, D, M> {
crate::iterator_common::PipeIterator::new(&self.pipe, self.id, n_to_read, n_to_consume)
}
}
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> Drop for PipeReader<A, D, M> {
fn drop(&mut self) {
self.pipe.drop_reader(self);
}
}
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> Writable<A, D, M>
for PipeWriter<A, D, M>
{
fn write<R>(
&self,
n_to_write: usize,
f: impl FnOnce(ArrayViewMut<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
self.pipe.write(n_to_write, f)
}
fn set_metadata(&self, metadata: &M) {
self.pipe.set_metadata(metadata.clone());
}
}
impl<A: Copy + Zeroable, D: SizedDimension + Dimension, M: Clone> Drop for PipeWriter<A, D, M> {
fn drop(&mut self) {
self.pipe.drop_writer();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
use ndarray::{Ix0, Ix4};
use std::path::Path;
use std::sync::Arc;
#[test]
fn testscalar() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(Path::new("test"), 32768, [])?);
let reader = pipe.clone().get_reader();
pipe.write(1024, |mut array, _pipe_state| -> Result<()> {
for ii in 0..1024 {
array[ii] = ii as f64;
}
Ok(())
})??;
pipe.read(&reader, 1024, 1024, |array, _pipe_state| -> Result<()> {
for ii in 0..1024 {
assert_eq!(array[ii], ii as f64);
}
Ok(())
})??;
Ok(())
}
#[test]
fn test_read_write() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix4, ()>::new(
Path::new("test"),
32768,
[3, 5, 7, 11],
)?);
let reader = pipe.clone().get_reader();
for _ in 0..2 {
pipe.write(1024, |mut array, _pipe_state| -> Result<()> {
println!("{}", array.ndim());
for dd in array.shape() {
println!("{}", dd);
}
let mut count = 0.0_f64;
for ii in 0..1024 {
for jj in 0..3 {
for kk in 0..5 {
for ll in 0..7 {
for mm in 0..11 {
array[(ii, jj, kk, ll, mm)] = count;
count += 1.0;
}
}
}
}
}
Ok(())
})??;
let mut count = 0.0_f64;
for _ in 0..2 {
pipe.read(&reader, 512, 512, |array, _pipe_state| -> Result<()> {
for ii in 0..512 {
for jj in 0..3 {
for kk in 0..5 {
for ll in 0..7 {
for mm in 0..11 {
if array[(ii, jj, kk, ll, mm)] != count {
assert_eq!(array[(ii, jj, kk, ll, mm)], count);
}
count += 1.0;
}
}
}
}
}
Ok(())
})??;
}
}
let view = pipe.buffer.view_mut::<f64>()?;
for ii in 0..1024 * 3 * 5 * 7 * 11 {
assert_eq!(ii as f64, view[ii]);
assert_eq!(ii as f64, view[ii + 1024 * 3 * 5 * 7 * 11]);
}
Ok(())
}
#[test]
fn test_write_only() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix4, ()>::new(
Path::new("test"),
32768,
[3, 5, 7, 11],
)?);
for _ in 0..2 {
pipe.write(1024, |mut array, _pipe_state| -> Result<()> {
println!("{}", array.ndim());
for dd in array.shape() {
println!("{}", dd);
}
let mut count = 0.0_f64;
for ii in 0..1024 {
for jj in 0..3 {
for kk in 0..5 {
for ll in 0..7 {
for mm in 0..11 {
array[(ii, jj, kk, ll, mm)] = count;
count += 1.0;
}
}
}
}
}
Ok(())
})??;
}
let view = pipe.buffer.view_mut::<f64>()?;
for ii in 0..1024 * 3 * 5 * 7 * 11 {
assert_eq!(ii as f64, view[ii]);
assert_eq!(ii as f64, view[ii + 1024 * 3 * 5 * 7 * 11]);
}
Ok(())
}
#[test]
fn test_raii_iterator() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(Path::new("iter_test"), 1024, [])?);
let reader = pipe.clone().get_reader();
let writer = pipe.clone().get_writer()?;
writer.write(100, |mut array, _state| {
for i in 0..100 {
array[i] = i as f64;
}
})?;
drop(writer);
let mut chunks_processed = 0;
let mut total_elements = 0;
let mut expected_value = 0.0;
for chunk_result in reader.iter_chunks(25, 25) {
let chunk = chunk_result?;
chunks_processed += 1;
total_elements += chunk.len();
for &value in chunk.iter() {
assert_eq!(value, expected_value);
expected_value += 1.0;
}
}
assert_eq!(chunks_processed, 4); assert_eq!(total_elements, 100);
assert_eq!(expected_value, 100.0);
Ok(())
}
#[test]
fn test_raii_iterator_early_drop() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(
Path::new("iter_early_drop"),
1024,
[],
)?);
let reader = pipe.clone().get_reader();
let writer = pipe.clone().get_writer()?;
writer.write(100, |mut array, _state| {
for i in 0..100 {
array[i] = i as f64;
}
})?;
drop(writer);
let mut chunk_count = 0;
for chunk_result in reader.iter_chunks(25, 25).take(2) {
let chunk = chunk_result?;
chunk_count += 1;
if chunk_count == 1 {
for (i, &value) in chunk.iter().enumerate() {
assert_eq!(value, i as f64);
}
} else if chunk_count == 2 {
for (i, &value) in chunk.iter().enumerate() {
assert_eq!(value, (25 + i) as f64);
}
}
}
assert_eq!(chunk_count, 2);
reader.read(25, 25, |array, _state| {
for (i, &value) in array.iter().enumerate() {
assert_eq!(value, (50 + i) as f64);
}
})?;
Ok(())
}
#[test]
fn test_raii_iterator_peek_semantics() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(Path::new("iter_peek"), 1024, [])?);
let reader = pipe.clone().get_reader();
let writer = pipe.clone().get_writer()?;
writer.write(50, |mut array, _state| {
for i in 0..50 {
array[i] = i as f64;
}
})?;
drop(writer);
let mut iteration = 0;
for chunk_result in reader.iter_chunks(25, 10) {
let chunk = chunk_result?;
iteration += 1;
if iteration == 1 {
for (i, &value) in chunk.iter().enumerate() {
assert_eq!(value, i as f64);
}
} else if iteration == 2 {
for (i, &value) in chunk.iter().enumerate() {
assert_eq!(value, (10 + i) as f64);
}
} else if iteration == 3 {
for (i, &value) in chunk.iter().enumerate() {
assert_eq!(value, (20 + i) as f64);
}
break; }
}
assert_eq!(iteration, 3);
Ok(())
}
#[test]
fn test_nelements_computation() -> Result<()> {
let pipe_scalar = Arc::new(Pipe::<f64, Ix0, ()>::new(
Path::new("test_nelements_scalar"),
1000,
[],
)?);
let buffer_size = pipe_scalar.buffer.size_bytes();
let element_size_bytes = pipe_scalar.shape_manager.element_size() * size_of::<f64>();
let expected_nelements = buffer_size / element_size_bytes;
assert_eq!(pipe_scalar.nelements(), expected_nelements);
assert!(pipe_scalar.nelements() >= 1000);
let pipe_multi = Arc::new(Pipe::<f64, Ix4, ()>::new(
Path::new("test_nelements_multi"),
100,
[2, 3, 4, 5],
)?);
let buffer_size_multi = pipe_multi.buffer.size_bytes();
let element_size_bytes_multi = pipe_multi.shape_manager.element_size() * size_of::<f64>();
let expected_nelements_multi = buffer_size_multi / element_size_bytes_multi;
assert_eq!(pipe_multi.nelements(), expected_nelements_multi);
assert!(pipe_multi.nelements() >= 100);
Ok(())
}
#[test]
fn test_writer_already_acquired_error() -> Result<()> {
use crate::error::PipeError;
let pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(
Path::new("test_writer_error"),
100,
[],
)?);
let _writer1 = pipe.clone().get_writer()?;
match pipe.clone().get_writer() {
Err(PipeError::WriterAlreadyAcquired { pipe_name }) => {
assert_eq!(pipe_name, "test_writer_error");
}
Ok(_) => panic!("Expected WriterAlreadyAcquired error, but got Ok"),
Err(other_error) => {
panic!("Expected WriterAlreadyAcquired error, got: {}", other_error)
}
}
Ok(())
}
}