#![feature(min_specialization)]
#[cfg(test)]
mod async_tests;
#[allow(dead_code)]
mod channel;
#[cfg(test)]
mod tests;
use channel::*;
use std::fmt::Debug;
use std::ops::Add;
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("The receiver has lagged behind and missed {0} messages")]
Lagged(usize),
#[error("There is no new data to retrieve from the channel")]
NoNewData,
#[error("channel buffer size must be at least 1")]
BufferTooSmall,
#[error("channel buffer size cannot exceed isize::MAX")]
BufferTooBig,
#[error("channel is being written so fast that a valid read was not possible")]
Overloaded,
#[error("the channel has been closed")]
Closed,
#[error("request cannot be filled as the channel capacity is smaller than the requested number of elements")]
ReadTooLarge,
}
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Gemino<T>>,
next_id: usize,
}
impl<T> Receiver<T> {
pub fn reset(mut self) -> Self {
self.next_id = 0;
self
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl<T> Receiver<T>
where
T: Clone,
{
pub fn recv(&mut self) -> Result<T, Error> {
let id = self.next_id;
match self.inner.get_blocking(id) {
Ok(value) => {
self.next_id += 1;
Ok(value)
}
Err(err) => match err {
ChannelError::IdTooOld(oldest) => {
self.next_id = oldest as usize;
let missed = self.next_id - id;
Err(Error::Lagged(missed))
}
ChannelError::Closed => Err(Error::Closed),
_ => panic!("unexpected error while receving from channel: {err}"),
},
}
}
pub fn recv_before(&mut self, timeout: core::time::Duration) -> Result<T, Error> {
let id = self.next_id;
match self
.inner
.get_blocking_before(id, Instant::now().add(timeout))
{
Ok(value) => {
self.next_id += 1;
Ok(value)
}
Err(err) => match err {
ChannelError::IdTooOld(oldest) => {
self.next_id = oldest as usize;
let missed = self.next_id - id;
Err(Error::Lagged(missed))
}
ChannelError::Closed => Err(Error::Closed),
ChannelError::Timeout => Err(Error::NoNewData),
_ => panic!("unexpected error while receving from channel: {err}"),
},
}
}
pub async fn recv_async(&mut self) -> Result<T, Error> {
let id = self.next_id;
match self.inner.get(id).await {
Ok(value) => {
self.next_id += 1;
Ok(value)
}
Err(err) => match err {
ChannelError::IdTooOld(oldest) => {
self.next_id = oldest as usize;
let missed = self.next_id - id;
Err(Error::Lagged(missed))
}
ChannelError::Closed => Err(Error::Closed),
_ => panic!("unexpected error while receving from channel: {err}"),
},
}
}
pub fn try_recv_many(&mut self, result: &mut Vec<T>) -> Result<usize, Error> {
let (first_id, last_id) =
self.inner
.read_batch_from(self.next_id, result)
.or_else(|err| match err {
ChannelError::IDNotYetWritten => Err(Error::NoNewData),
ChannelError::Closed => Err(Error::Closed),
_ => panic!("unexpected error while performing bulk read: {err}"),
})?;
let mut missed = 0;
if first_id as usize > self.next_id {
missed = first_id as usize - self.next_id
}
self.next_id = last_id as usize + 1;
Ok(missed)
}
pub fn recv_at_least(&mut self, num: usize, result: &mut Vec<T>) -> Result<usize, Error> {
let (first_id, last_id) = self
.inner
.read_at_least(num, self.next_id, result)
.or_else(|err| match err {
ChannelError::IDNotYetWritten => Err(Error::NoNewData),
ChannelError::Closed => Err(Error::Closed),
ChannelError::ReadTooLarge => Err(Error::ReadTooLarge),
_ => panic!("unexpected error while performing bulk read: {err}"),
})?;
let mut missed = 0;
if first_id as usize > self.next_id {
missed = first_id as usize - self.next_id
}
self.next_id = last_id as usize + 1;
Ok(missed)
}
pub async fn recv_at_least_async(
&mut self,
num: usize,
result: &mut Vec<T>,
) -> Result<usize, Error> {
let (first_id, last_id) = self
.inner
.read_at_least_async(num, self.next_id, result)
.await
.or_else(|err| match err {
ChannelError::IDNotYetWritten => Err(Error::NoNewData),
ChannelError::Closed => Err(Error::Closed),
ChannelError::ReadTooLarge => Err(Error::ReadTooLarge),
_ => panic!("unexpected error while performing bulk read: {err}"),
})?;
let mut missed = 0;
if first_id as usize > self.next_id {
missed = first_id as usize - self.next_id
}
self.next_id = last_id as usize + 1;
Ok(missed)
}
pub fn try_recv(&mut self) -> Result<T, Error> {
let id = self.next_id;
match self.inner.try_get(id) {
Ok(value) => {
self.next_id += 1;
Ok(value)
}
Err(err) => {
match err {
ChannelError::IdTooOld(oldest_valid_id) => {
self.next_id = oldest_valid_id as usize;
let missed = oldest_valid_id as usize - id;
Err(Error::Lagged(missed))
}
ChannelError::Closed => Err(Error::Closed),
ChannelError::IDNotYetWritten => Err(Error::NoNewData),
_ => panic!("unexpected error while receiving value from channel: {err}"),
}
}
}
}
fn try_latest(&mut self) -> Result<(T, isize), Error> {
self.inner.get_latest().or_else(|err| match err {
ChannelError::Overloaded => Err(Error::Overloaded),
ChannelError::IDNotYetWritten => Err(Error::NoNewData),
ChannelError::Closed => Err(Error::Closed),
_ => panic!("unexpected data while getting latest value from channel: {err}"),
})
}
pub fn latest(&mut self) -> Result<T, Error> {
let (value, id) = self.try_latest()?;
if (id as usize) < self.next_id {
return self.recv();
}
self.next_id = id as usize + 1;
Ok(value)
}
pub async fn latest_async(&mut self) -> Result<T, Error> {
let (value, id) = self.try_latest()?;
if (id as usize) < self.next_id {
return self.recv_async().await;
}
self.next_id = id as usize + 1;
Ok(value)
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
next_id: self.next_id,
}
}
}
impl<T> From<Arc<Gemino<T>>> for Receiver<T> {
fn from(ring_buffer: Arc<Gemino<T>>) -> Self {
Self {
inner: ring_buffer,
next_id: 0,
}
}
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Gemino<T>>,
}
impl<T> Sender<T> {
pub fn close(&self) {
self.inner.close();
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl<T> Sender<T>
where
T: Clone,
{
pub fn send(&self, val: T) -> Result<(), Error> {
if let Err(err) = self.inner.send(val) {
return match err {
ChannelError::Closed => Err(Error::Closed),
_ => panic!("unexpected error attempting to send to channel"),
};
}
Ok(())
}
pub fn subscribe(&self) -> Receiver<T> {
Receiver::from(self.inner.clone())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> From<Arc<Gemino<T>>> for Sender<T> {
fn from(ring_buffer: Arc<Gemino<T>>) -> Self {
Self { inner: ring_buffer }
}
}
impl<T> From<Receiver<T>> for Sender<T> {
fn from(receiver: Receiver<T>) -> Self {
Self {
inner: receiver.inner,
}
}
}
pub fn channel<T>(buffer_size: usize) -> Result<(Sender<T>, Receiver<T>), Error> {
let chan = Gemino::new(buffer_size).or_else(|err| match err {
ChannelError::BufferTooSmall => Err(Error::BufferTooSmall),
ChannelError::BufferTooBig => Err(Error::BufferTooBig),
_ => panic!("unexpected error while creating a new channel: {err}"),
})?;
let sender = Sender::from(chan.clone());
let receiver = Receiver::from(chan);
Ok((sender, receiver))
}