use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use std::sync::Arc;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ChannelError {
#[error("channel no longer contains the requested value")]
IdTooOld(isize),
#[error("value with the given ID has not yet been written to the channel")]
IDNotYetWritten,
#[error("the given index is invalid. Index's must be between 0 and isize::MAX")]
InvalidIndex,
#[error("operation timed out")]
Timeout,
#[error("channel buffer size cannot be zero")]
BufferTooSmall,
#[error("channel buffer size cannot exceed isize::MAX")]
BufferTooBig,
#[error("channel is being completely overwritten before a read can take place")]
Overloaded,
#[error("channel has been closed")]
Closed,
#[error("cannot request more than buffer size elements at once")]
ReadTooLarge,
}
#[derive(Debug)]
pub(crate) struct Gemino<T> {
inner: *mut Vec<T>,
write_head: AtomicIsize,
read_head: AtomicIsize,
capacity: isize,
event: event_listener::Event,
closed: AtomicBool,
cell_locks: Vec<parking_lot::RwLock<()>>,
}
pub trait Channel<T> {
fn try_get(&self, id: usize) -> Result<T, ChannelError>;
fn get_latest(&self) -> Result<(T, isize), ChannelError>;
fn read_batch_from(
&self,
from_id: usize,
into: &mut Vec<T>,
) -> Result<(isize, isize), ChannelError>;
}
unsafe impl<T> Sync for Gemino<T> {}
unsafe impl<T> Send for Gemino<T> {}
impl<T> Drop for Gemino<T> {
fn drop(&mut self) {
let mut inner;
unsafe {
inner = Box::from_raw(self.inner);
}
let length = (self.read_head.load(Ordering::Acquire) + 1) as usize;
if length < self.capacity as usize {
unsafe {
inner.set_len(length);
}
}
}
}
impl<T> Gemino<T> {
#[allow(clippy::uninit_vec)]
pub(crate) fn new(buffer_size: usize) -> Result<Arc<Self>, ChannelError> {
if buffer_size < 1 {
return Err(ChannelError::BufferTooSmall);
}
if buffer_size > isize::MAX as usize {
return Err(ChannelError::BufferTooBig);
}
let mut inner = Box::new(Vec::with_capacity(buffer_size));
unsafe {
inner.set_len(buffer_size);
}
let inner = Box::into_raw(inner);
let mut cell_locks;
{
cell_locks = Vec::new();
cell_locks.resize_with(buffer_size, Default::default);
}
Ok(Arc::new(Self {
inner,
write_head: AtomicIsize::new(0),
read_head: AtomicIsize::new(-1),
capacity: buffer_size as isize,
event: event_listener::Event::new(),
closed: AtomicBool::new(false),
cell_locks,
}))
}
#[inline]
pub fn oldest(&self) -> isize {
let head = self.write_head.load(Ordering::Acquire);
if head < self.capacity {
0
} else {
head - self.capacity
}
}
pub fn capacity(&self) -> usize {
self.capacity as usize
}
#[inline(always)]
fn closed(&self) -> Result<(), ChannelError> {
if self.closed.load(Ordering::Relaxed) {
return Err(ChannelError::Closed);
}
Ok(())
}
#[inline(always)]
pub fn is_closed(&self) -> bool {
self.closed().is_err()
}
pub fn close(&self) {
self.closed.store(true, Ordering::Release);
self.event.notify(usize::MAX);
}
}
impl<T> Channel<T> for Gemino<T>
where
T: Clone,
{
default fn try_get(&self, id: usize) -> Result<T, ChannelError> {
if id > isize::MAX as usize {
return Err(ChannelError::InvalidIndex);
}
let id = id as isize;
let index = id % self.capacity;
let latest_committed_id = self.read_head.load(Ordering::Acquire);
if latest_committed_id < 0 || id > latest_committed_id {
self.closed()?;
return Err(ChannelError::IDNotYetWritten);
}
let _read_lock = self.cell_locks[index as usize].read();
let oldest = self.oldest();
if id < oldest {
return Err(ChannelError::IdTooOld(oldest));
}
unsafe { Ok((*self.inner)[index as usize].clone()) }
}
default fn get_latest(&self) -> Result<(T, isize), ChannelError> {
self.closed()?;
let latest_committed_id = self.read_head.load(Ordering::Acquire);
if latest_committed_id < 0 {
return Err(ChannelError::IDNotYetWritten);
}
let index = (latest_committed_id % self.capacity) as usize;
let _read_lock = self.cell_locks[index].read();
let oldest = self.oldest();
if latest_committed_id < oldest {
return Err(ChannelError::Overloaded);
}
unsafe { Ok(((*self.inner)[index].clone(), latest_committed_id)) }
}
default fn read_batch_from(
&self,
from_id: usize,
into: &mut Vec<T>,
) -> Result<(isize, isize), ChannelError> {
if from_id > isize::MAX as usize {
return Err(ChannelError::InvalidIndex);
}
let mut from_id = from_id as isize;
let latest_committed_id = self.read_head.load(Ordering::Acquire);
if latest_committed_id < from_id {
self.closed()?;
return Err(ChannelError::IDNotYetWritten);
}
let mut oldest = self.oldest();
if from_id < oldest {
from_id = oldest
}
let mut start_idx = (from_id % self.capacity) as usize;
let end_idx = (latest_committed_id % self.capacity) as usize + 1;
let mut _read_lock;
unsafe {
_read_lock = self.cell_locks.get_unchecked(start_idx).read();
}
oldest = self.oldest();
while from_id < oldest {
from_id = oldest;
start_idx = (from_id % self.capacity) as usize;
let new_lock;
unsafe {
new_lock = self.cell_locks.get_unchecked(start_idx).read();
}
_read_lock = new_lock;
oldest = self.oldest();
}
if from_id > latest_committed_id {
return Err(ChannelError::Overloaded);
}
self.batch(into, start_idx, end_idx);
Ok((from_id, latest_committed_id))
}
}
impl<T> Channel<T> for Gemino<T>
where
T: Copy,
{
fn try_get(&self, id: usize) -> Result<T, ChannelError> {
if id > isize::MAX as usize {
return Err(ChannelError::InvalidIndex);
}
let id = id as isize;
let index = id % self.capacity;
let latest_committed_id = self.read_head.load(Ordering::Acquire);
if latest_committed_id < 0 || id > latest_committed_id {
self.closed()?;
return Err(ChannelError::IDNotYetWritten);
}
let result;
unsafe {
result = (*self.inner)[index as usize];
}
let oldest = self.oldest();
if id < oldest {
return Err(ChannelError::IdTooOld(oldest));
}
Ok(result)
}
fn get_latest(&self) -> Result<(T, isize), ChannelError> {
self.closed()?;
let latest_committed_id = self.read_head.load(Ordering::Acquire);
if latest_committed_id < 0 {
return Err(ChannelError::IDNotYetWritten);
}
let result;
unsafe {
result = Ok((
(*self.inner)[(latest_committed_id % self.capacity) as usize],
latest_committed_id,
))
}
if latest_committed_id < self.oldest() {
return Err(ChannelError::Overloaded);
}
result
}
fn read_batch_from(
&self,
from_id: usize,
into: &mut Vec<T>,
) -> Result<(isize, isize), ChannelError> {
if from_id > isize::MAX as usize {
return Err(ChannelError::InvalidIndex);
}
let mut from_id = from_id as isize;
let latest_committed_id = self.read_head.load(Ordering::Acquire);
if latest_committed_id < from_id {
self.closed()?;
return Err(ChannelError::IDNotYetWritten);
}
let oldest = self.oldest();
if from_id < oldest {
from_id = oldest
}
let start_idx = (from_id % self.capacity) as usize;
let end_idx = (latest_committed_id % self.capacity) as usize + 1;
self.batch(into, start_idx, end_idx);
let oldest = self.oldest();
if from_id < oldest {
let num_to_remove = (oldest - from_id) as usize;
into.drain(0..num_to_remove);
from_id = oldest;
}
Ok((from_id, latest_committed_id))
}
}
impl<T> Gemino<T>
where
T: Clone,
{
pub fn send(&self, val: T) -> Result<isize, ChannelError> {
if self.closed.load(Ordering::Relaxed) {
return Err(ChannelError::Closed);
}
let id = self.write_head.fetch_add(1, Ordering::Release);
let index = id % self.capacity;
let pos;
unsafe {
pos = (*self.inner).get_unchecked_mut(index as usize);
}
let mut _old_value: T;
{
let _write_lock;
unsafe {
_write_lock = self.cell_locks.get_unchecked(index as usize).write();
}
if id < self.capacity {
unsafe {
std::ptr::copy_nonoverlapping(&val, pos, 1);
}
std::mem::forget(val);
} else {
_old_value = std::mem::replace(pos, val);
}
}
while self
.read_head
.compare_exchange_weak(id - 1, id, Ordering::Release, Ordering::Acquire)
.is_err()
{}
self.event.notify(usize::MAX);
Ok(id)
}
fn batch(&self, into: &mut Vec<T>, start_idx: usize, end_idx: usize) {
if end_idx > start_idx {
let num_elements = end_idx - start_idx;
into.reserve(num_elements);
let res_start = into.len();
unsafe {
into.set_len(res_start + num_elements);
let slice_to_copy = &((*self.inner)[start_idx..(end_idx)]);
into[res_start..(res_start + num_elements)].clone_from_slice(slice_to_copy);
}
} else {
let num_elements = end_idx + (self.capacity as usize - start_idx);
into.reserve(num_elements);
let mut res_start = into.len();
unsafe {
into.set_len(res_start + num_elements);
let slice_to_copy = &((*self.inner)[start_idx..]);
into[res_start..(res_start + slice_to_copy.len())].clone_from_slice(slice_to_copy);
res_start += slice_to_copy.len();
let slice_to_copy = &((*self.inner)[..end_idx]);
into[res_start..(res_start + slice_to_copy.len())].clone_from_slice(slice_to_copy);
}
}
}
pub fn read_at_least(
&self,
num: usize,
mut from_id: usize,
into: &mut Vec<T>,
) -> Result<(isize, isize), ChannelError> {
if num > self.capacity() {
return Err(ChannelError::ReadTooLarge);
}
let original_from = from_id;
loop {
let listener = self.event.listen();
let oldest = self.oldest();
if from_id < oldest as usize {
from_id = oldest as usize;
}
let latest = self.read_head.load(Ordering::Acquire);
let num_available = (latest as usize) - from_id + 1;
if num_available >= num {
return self.read_batch_from(original_from, into);
}
listener.wait();
}
}
pub async fn read_at_least_async(
&self,
num: usize,
mut from_id: usize,
into: &mut Vec<T>,
) -> Result<(isize, isize), ChannelError> {
if num > self.capacity() {
return Err(ChannelError::ReadTooLarge);
}
loop {
let listener = self.event.listen();
let oldest = self.oldest();
if from_id < oldest as usize {
from_id = oldest as usize;
}
let latest = self.read_head.load(Ordering::Acquire);
let num_available = (latest as usize) - from_id + 1;
if num_available >= num {
return self.read_batch_from(from_id, into);
}
listener.await;
}
}
pub fn get_blocking(&self, id: usize) -> Result<T, ChannelError> {
let immediate = self.try_get(id);
if let Err(err) = &immediate {
if !matches!(err, ChannelError::IDNotYetWritten) {
return immediate;
}
} else {
return immediate;
}
while self.read_head.load(Ordering::Acquire) < id as isize {
let listener = self.event.listen();
if self.read_head.load(Ordering::Acquire) < id as isize {
listener.wait();
}
self.closed()?
}
self.try_get(id)
}
pub fn get_blocking_before(
&self,
id: usize,
before: std::time::Instant,
) -> Result<T, ChannelError> {
let immediate = self.try_get(id);
if let Err(err) = &immediate {
if !matches!(err, ChannelError::IDNotYetWritten) {
return immediate;
}
} else {
return immediate;
}
while self.read_head.load(Ordering::Acquire) < id as isize {
let listener = self.event.listen();
if self.read_head.load(Ordering::Acquire) < id as isize
&& !listener.wait_deadline(before)
{
return Err(ChannelError::Timeout);
}
self.closed()?;
}
self.try_get(id)
}
pub async fn get(&self, id: usize) -> Result<T, ChannelError> {
let immediate = self.try_get(id);
if let Err(err) = &immediate {
if !matches!(err, ChannelError::IDNotYetWritten) {
return immediate;
}
} else {
return immediate;
}
while self.read_head.load(Ordering::Acquire) < id as isize {
let listener = self.event.listen();
if self.read_head.load(Ordering::Acquire) < id as isize {
listener.await;
}
self.closed()?;
}
Ok(self.try_get(id).unwrap())
}
pub async fn read_next(&self) -> (T, isize) {
self.event.listen().await;
self.get_latest().unwrap()
}
}