mod buffer;
use buffer::CircularBuffer;
use anyhow::{bail, Error, Result};
use ndarray::{ArrayView, ArrayView1, ArrayViewMut, Dim, Dimension, StrideShape};
use std::any::type_name;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem::size_of;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering, AtomicBool};
use std::time::Duration;
use std::sync::{RwLock, Arc};
use std::thread::sleep;
use std::clone::Clone;
use uuid::Uuid;
pub trait SizedDimension {
type LargerSize;
type CurrentSize;
fn get_larger_array_size(nel: usize, size: Self::CurrentSize) -> Self::LargerSize;
fn from_array_view(view: ArrayView1<usize>) -> Self::CurrentSize;
}
impl SizedDimension for Dim<[usize; 0]> {
type CurrentSize = ();
type LargerSize = usize;
fn get_larger_array_size(nel: usize, _size: Self::CurrentSize) -> Self::LargerSize {
nel
}
fn from_array_view(_view: ArrayView1<usize>) -> Self::CurrentSize {
()
}
}
impl SizedDimension for Dim<[usize; 1]> {
type CurrentSize = usize;
type LargerSize = (usize, usize);
fn get_larger_array_size(nel: usize, size: Self::CurrentSize) -> Self::LargerSize {
(nel, size)
}
fn from_array_view(view: ArrayView1<usize>) -> Self::CurrentSize {
view[0]
}
}
impl SizedDimension for Dim<[usize; 2]> {
type CurrentSize = (usize, usize);
type LargerSize = (usize, usize, usize);
fn get_larger_array_size(nel: usize, size: Self::CurrentSize) -> Self::LargerSize {
(nel, size.1, size.0)
}
fn from_array_view(view: ArrayView1<usize>) -> Self::CurrentSize {
(view[1], view[0])
}
}
impl SizedDimension for Dim<[usize; 3]> {
type CurrentSize = (usize, usize, usize);
type LargerSize = (usize, usize, usize, usize);
fn get_larger_array_size(nel: usize, size: Self::CurrentSize) -> Self::LargerSize {
(nel, size.2, size.1, size.0)
}
fn from_array_view(view: ArrayView1<usize>) -> Self::CurrentSize {
(view[2], view[1], view[0])
}
}
impl SizedDimension for Dim<[usize; 4]> {
type CurrentSize = (usize, usize, usize, usize);
type LargerSize = (usize, usize, usize, usize, usize);
fn get_larger_array_size(nel: usize, size: Self::CurrentSize) -> Self::LargerSize {
(nel, size.3, size.2, size.1, size.0)
}
fn from_array_view(view: ArrayView1<usize>) -> Self::CurrentSize {
(view[3], view[2], view[1], view[0])
}
}
impl SizedDimension for Dim<[usize; 5]> {
type CurrentSize = (usize, usize, usize, usize, usize);
type LargerSize = (usize, usize, usize, usize, usize, usize);
fn get_larger_array_size(nel: usize, size: Self::CurrentSize) -> Self::LargerSize {
(nel, size.4, size.3, size.2, size.1, size.0)
}
fn from_array_view(view: ArrayView1<usize>) -> Self::CurrentSize {
(view[4], view[3], view[2], view[1], view[0])
}
}
pub struct PipeState {
pub write_ptr: usize,
pub read_ptr: usize,
}
unsafe impl<A: Send, D: SizedDimension + Dimension, M: Clone> Send for Pipe<A, D, M> {}
unsafe impl<A: Send, D: SizedDimension + Dimension, M: Clone> Sync for Pipe<A, D, M> {}
pub struct Pipe<A, D: SizedDimension + Dimension, M: Clone> {
_phantom_type: PhantomData<A>,
name: String,
nelements: usize,
buffer: CircularBuffer,
shape: StrideShape<D>,
shape_tuple: D::CurrentSize,
read_ptrs: RwLock<HashMap<Uuid, usize>>,
write_ptr: AtomicUsize,
returned_writer: AtomicBool,
writer_dropped: AtomicBool,
metadata: RwLock<Option<M>>,
}
pub struct PipeReader<A, D: SizedDimension + Dimension, M: Clone> {
id: Uuid,
pipe: Arc<Pipe<A, D, M>>
}
pub struct PipeWriter<A, D: SizedDimension + Dimension, M: Clone> {
pipe: Arc<Pipe<A, D, M>>
}
impl<A, D: SizedDimension + Dimension, M: Clone> Pipe<A, D, M> {
pub fn new<Sh: Into<StrideShape<D>>>(
name: impl AsRef<Path>,
nelements: usize,
shape_input: Sh,
) -> Result<Pipe<A, D, M>> {
let shape: StrideShape<D> = shape_input.into();
let buffer_size_bytes = nelements * shape.size() * size_of::<A>();
let shape_tuple = D::from_array_view(shape.raw_dim().as_array_view());
println!("{}", type_name::<D::CurrentSize>());
Ok(Pipe {
name: name.as_ref().to_str().unwrap().to_string(),
nelements,
buffer: CircularBuffer::new(name, buffer_size_bytes)?,
shape,
shape_tuple,
read_ptrs: RwLock::new(HashMap::new()),
write_ptr: AtomicUsize::new(0),
returned_writer: AtomicBool::new(false),
writer_dropped: AtomicBool::new(false),
metadata: None.into(),
_phantom_type: PhantomData,
})
}
fn get_metadata(&self) -> Option<M> {
self.metadata.read().unwrap().clone()
}
fn set_metadata(&self, m: M) {
*self.metadata.write().unwrap() = Some(m);
}
fn drop_writer(&self) {
self.writer_dropped.store(true, Ordering::Relaxed);
}
fn drop_reader(&self, reader: &PipeReader<A, D, M>) {
self.read_ptrs.write().unwrap().remove(&reader.id);
}
pub fn get_reader(self: Arc<Self>) -> PipeReader<A, D, M> {
let rdr = PipeReader { id: Uuid::new_v4(), pipe: Arc::clone(&self),
};
self.read_ptrs
.write()
.unwrap()
.insert(rdr.id, self.write_ptr.load(Ordering::Relaxed));
rdr
}
pub fn get_writer(self: Arc<Self>) -> Result<PipeWriter<A, D, M>> {
if self.returned_writer.load(Ordering::Acquire) {
bail!("On pipe: {} There is only one writer and it was already acquired", self.name)
}
let writer = PipeWriter{pipe: Arc::clone(&self)};
self.returned_writer.store(true, Ordering::Release);
Ok(writer)
}
fn write<R>(
&self,
n_to_write: usize,
f: impl FnOnce(ArrayViewMut<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
if self.writer_dropped.load(Ordering::Relaxed) {
panic!("On pipe: {} The one writer returned for {} dropped. {} shouldn't be written to any more.", self.name, self.name, self.name)
}
if n_to_write > self.nelements {
bail!(
"On pipe: {} Trying to write {} into a buffer capable of holding {}",
self.name,
n_to_write,
self.nelements
);
}
let mut write_ptr = self.write_ptr.load(Ordering::Acquire);
let write_ptr_mod = write_ptr % self.nelements;
let slice = &mut self.buffer.view_mut::<A>()?
[write_ptr_mod * self.shape.size()..(write_ptr_mod + n_to_write) * self.shape.size()];
let data = ArrayViewMut::<A, D::Larger>::from_shape(
D::get_larger_array_size(n_to_write, self.shape_tuple.clone().into()),
slice,
)?;
let mut last_duration = Duration::new(0, 1);
let mut this_duration = Duration::new(0, 1);
let mut min_read_ptr;
loop {
min_read_ptr = self
.read_ptrs
.read()
.unwrap()
.values()
.map(|read_ptr| {
get_capacity(*read_ptr % self.nelements, write_ptr_mod, self.nelements)
})
.min();
let capacity = min_read_ptr.unwrap_or(self.nelements);
if capacity > n_to_write {
break;
} else {
sleep(this_duration);
let tmp = last_duration;
last_duration = this_duration;
this_duration = last_duration + tmp;
}
}
let r = f(
data,
PipeState {
write_ptr,
read_ptr: min_read_ptr.unwrap_or(write_ptr),
},
);
write_ptr += n_to_write;
self.write_ptr.store(write_ptr, Ordering::Release);
Ok(r)
}
fn read<R>(
&self,
reader: &PipeReader<A, D, M>,
n_to_read: usize,
n_to_consume: usize,
f: impl FnOnce(ArrayView<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
if n_to_read > self.nelements {
bail!(
"On pipe: {} Trying to read {} into a buffer capable of holding {}",
self.name,
n_to_read,
self.nelements
);
}
let read_ptr = *(self.read_ptrs.read().unwrap().get(&reader.id)).ok_or(Error::msg(
format!("Reader {} not registered with {}", reader.id, self.name),
))?;
let mut last_duration = Duration::new(0, 1);
let mut this_duration = Duration::new(0, 1);
loop {
if read_ptr + n_to_read <= self.write_ptr.load(Ordering::Relaxed) {
break;
} else {
if self.writer_dropped.load(Ordering::Relaxed) {
bail!("On pipe: {} Writer dropped and there isn't enough data to meet the request of {}", self.name, n_to_read);
}
sleep(this_duration);
let tmp = last_duration;
last_duration = this_duration;
this_duration = last_duration + tmp;
}
}
let read_ptr_mod = read_ptr % self.nelements;
let slice = &self.buffer.view::<A>()?
[read_ptr_mod * self.shape.size()..(read_ptr_mod + n_to_read) * self.shape.size()];
let data = ArrayView::<A, D::Larger>::from_shape(
D::get_larger_array_size(n_to_read, self.shape_tuple.clone().into()),
slice,
)?;
let r = f(
data,
PipeState {
write_ptr: self.write_ptr.load(Ordering::Relaxed),
read_ptr,
},
);
self.read_ptrs
.write()
.unwrap()
.entry(reader.id)
.and_modify(|old_ptr| *old_ptr += n_to_consume);
Ok(r)
}
}
impl<A, D: SizedDimension + Dimension, M: Clone> PipeReader <A, D, M> {
pub fn read<R>(
&self,
n_to_read: usize,
n_to_consume: usize,
f: impl FnOnce(ArrayView<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
self.pipe.read(&self, n_to_read, n_to_consume, f)
}
pub fn get_metadata(&self) -> Option<M> {
self.pipe.get_metadata()
}
}
impl<A, D: SizedDimension + Dimension, M: Clone> Drop for PipeReader <A, D, M> {
fn drop(&mut self) {
self.pipe.drop_reader(self);
}
}
impl<A, D: SizedDimension + Dimension, M: Clone> PipeWriter <A, D, M> {
pub fn write<R>(
&self,
n_to_write: usize,
f: impl FnOnce(ArrayViewMut<A, D::Larger>, PipeState) -> R,
) -> Result<R>
where
D::LargerSize: Into<StrideShape<D::Larger>> + Clone,
D::CurrentSize: Clone,
{
self.pipe.write(n_to_write, f)
}
pub fn set_metadata(&self, metadata: &M){
self.pipe.set_metadata(metadata.clone());
}
}
impl<A, D: SizedDimension + Dimension, M: Clone> Drop for PipeWriter <A, D, M> {
fn drop(&mut self) {
self.pipe.drop_writer();
}
}
fn get_capacity(read_ptr: usize, write_ptr: usize, nelements: usize) -> usize {
assert!(read_ptr < nelements);
assert!(write_ptr < nelements);
if write_ptr > read_ptr {
nelements - write_ptr + read_ptr
} else if write_ptr < read_ptr {
read_ptr - write_ptr
} else {
nelements
}
}
#[cfg(test)]
mod test {
use super::*;
use ndarray::{Ix0, Ix4};
#[test]
fn test_capacity() {
assert_eq!(get_capacity(0, 10, 100), 90);
assert_eq!(get_capacity(10, 0, 100), 10);
assert_eq!(get_capacity(0, 0, 100), 100);
}
#[test]
fn testscalar() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(Path::new("test"), 32768, [])?);
let reader = pipe.clone().get_reader();
pipe.write(1024, |mut array, _pipe_state| -> Result<()> {
for ii in 0..1024 {
array[ii] = ii as f64;
}
Ok(())
})
??;
pipe.read(&reader, 1024, 1024, |array, _pipe_state| -> Result<()> {
for ii in 0..1024 {
assert_eq!(array[ii], ii as f64);
}
Ok(())
})
??;
Ok(())
}
#[test]
fn test_read_write() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix4, ()>::new(Path::new("test"), 32768, [3, 5, 7, 11])?);
let reader = pipe.clone().get_reader();
for _ in 0..2 {
pipe.write(1024, |mut array, _pipe_state| -> Result<()> {
println!("{}", array.ndim());
for dd in array.shape() {
println!("{}", dd);
}
let mut count = 0.0_f64;
for ii in 0..1024 {
for jj in 0..3 {
for kk in 0..5 {
for ll in 0..7 {
for mm in 0..11 {
array[(ii, jj, kk, ll, mm)] = count;
count += 1.0;
}
}
}
}
}
Ok(())
})
??;
let mut count = 0.0_f64;
for _ in 0..2 {
pipe.read(&reader, 512, 512, |array, _pipe_state| -> Result<()> {
for ii in 0..512 {
for jj in 0..3 {
for kk in 0..5 {
for ll in 0..7 {
for mm in 0..11 {
if array[(ii, jj, kk, ll, mm)] != count {
assert_eq!(array[(ii, jj, kk, ll, mm)], count);
}
count += 1.0;
}
}
}
}
}
Ok(())
})
??;
}
}
let view = pipe.buffer.view_mut::<f64>()?;
for ii in 0..1024 * 3 * 5 * 7 * 11 {
assert_eq!(ii as f64, view[ii]);
assert_eq!(ii as f64, view[ii + 1024 * 3 * 5 * 7 * 11]);
}
Ok(())
}
#[test]
fn test_write_only() -> Result<()> {
let pipe = Arc::new(Pipe::<f64, Ix4, ()>::new(Path::new("test"), 32768, [3, 5, 7, 11])?);
for _ in 0..2 {
pipe.write(1024, |mut array, _pipe_state| -> Result<()> {
println!("{}", array.ndim());
for dd in array.shape() {
println!("{}", dd);
}
let mut count = 0.0_f64;
for ii in 0..1024 {
for jj in 0..3 {
for kk in 0..5 {
for ll in 0..7 {
for mm in 0..11 {
array[(ii, jj, kk, ll, mm)] = count;
count += 1.0;
}
}
}
}
}
Ok(())
})
??;
}
let view = pipe.buffer.view_mut::<f64>()?;
for ii in 0..1024 * 3 * 5 * 7 * 11 {
assert_eq!(ii as f64, view[ii]);
assert_eq!(ii as f64, view[ii + 1024 * 3 * 5 * 7 * 11]);
}
Ok(())
}
}