pub mod channel;
mod error;
pub mod io;
pub mod process;
pub use error::{ClosedMarketFailure, ConsumeError, ProduceError, Recall};
use {
core::{
cell::RefCell,
fmt::{self, Debug, Display},
marker::PhantomData,
sync::atomic::{AtomicBool, Ordering},
},
crossbeam_queue::SegQueue,
fehler::{throw, throws},
never::Never,
std::error::Error,
};
pub trait Consumer {
type Good;
type Failure: Error;
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume_all(&self) -> Vec<Self::Good> {
let mut goods = Vec::new();
loop {
match self.consume() {
Ok(good) => {
goods.push(good);
}
Err(error) => {
if goods.is_empty() {
throw!(error);
} else {
break goods;
}
}
}
}
}
#[inline]
#[throws(Self::Failure)]
fn demand(&self) -> Self::Good {
loop {
match self.consume() {
Ok(good) => {
break good;
}
Err(error) => {
if let ConsumeError::Failure(failure) = error {
throw!(failure);
}
}
}
}
}
#[inline]
fn adapt<G, F>(self) -> Adapter<Self, G, F>
where
Self: Sized,
{
Adapter::new(self)
}
}
#[allow(clippy::missing_inline_in_public_items)]
pub trait Producer {
type Good;
type Failure: Error;
#[allow(redundant_semicolons, unused_variables)]
#[throws(ProduceError<Self::Failure>)]
fn produce(&self, good: Self::Good);
#[throws(Recall<Self::Good, Self::Failure>)]
fn produce_or_recall(&self, good: Self::Good)
where
Self::Good: Clone + Debug + Display,
{
self.produce(good.clone())
.map_err(|error| Recall::new(good, error))?
}
#[inline]
#[throws(Self::Failure)]
fn force(&self, mut good: Self::Good)
where
Self::Good: Clone + Debug + Display,
{
loop {
match self.produce_or_recall(good) {
Ok(()) => break,
Err(recall) => {
good = recall.good_if_full()?;
}
}
}
}
#[throws(Self::Failure)]
fn force_all(&self, goods: Vec<Self::Good>)
where
Self::Good: Clone + Debug + Display,
{
for good in goods {
self.force(good)?
}
}
}
#[derive(Debug)]
pub struct Adapter<C, G, F> {
consumer: C,
good: PhantomData<G>,
failure: PhantomData<F>,
}
impl<C, G, F> Adapter<C, G, F> {
const fn new(consumer: C) -> Self {
Self {
consumer,
good: PhantomData,
failure: PhantomData,
}
}
}
impl<C, G, F> Consumer for Adapter<C, G, F>
where
C: Consumer,
G: From<C::Good>,
F: From<C::Failure> + Error,
{
type Good = G;
type Failure = F;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
self.consumer
.consume()
.map_err(|error| match error {
ConsumeError::EmptyStock => ConsumeError::EmptyStock,
ConsumeError::Failure(failure) => {
ConsumeError::Failure(Self::Failure::from(failure))
}
})
.map(Self::Good::from)?
}
}
#[derive(Default)]
pub struct Collector<G, E> {
consumers: Vec<Box<dyn Consumer<Good = G, Failure = E>>>,
}
impl<G, E> Collector<G, E> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self {
consumers: Vec::new(),
}
}
#[inline]
pub fn convert_into_and_push<C>(&mut self, consumer: C)
where
C: Consumer + 'static,
G: From<C::Good> + 'static,
E: From<C::Failure> + Error + 'static,
{
self.push(consumer.adapt());
}
#[inline]
pub fn push<C>(&mut self, consumer: C)
where
C: Consumer<Good = G, Failure = E> + 'static,
E: Error,
{
self.consumers.push(Box::new(consumer));
}
}
impl<G, E> Consumer for Collector<G, E>
where
E: Error,
{
type Good = G;
type Failure = E;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
let mut result = Err(ConsumeError::EmptyStock);
for consumer in &self.consumers {
result = match consumer.consume() {
Ok(good) => Ok(good),
Err(error) => match error {
ConsumeError::EmptyStock => continue,
ConsumeError::Failure(_) => Err(error),
},
};
break;
}
result?
}
}
impl<G, E> Debug for Collector<G, E> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Collector {{ .. }}")
}
}
pub trait StripFrom<G>
where
Self: Sized,
{
fn strip_from(good: &G) -> Vec<Self>;
}
#[derive(Debug)]
pub struct StrippingProducer<G, P>
where
P: Producer,
<P as Producer>::Good: Debug,
{
#[doc(hidden)]
phantom: PhantomData<G>,
producer: P,
parts: RefCell<Vec<<P as Producer>::Good>>,
}
impl<G, P> StrippingProducer<G, P>
where
P: Producer,
<P as Producer>::Good: Debug,
{
#[inline]
pub fn new(producer: P) -> Self {
Self {
producer,
phantom: PhantomData,
parts: RefCell::new(Vec::new()),
}
}
}
impl<G, P> Producer for StrippingProducer<G, P>
where
P: Producer,
G: Debug + Display,
<P as Producer>::Good: StripFrom<G> + Clone + Debug,
<P as Producer>::Failure: Error,
{
type Good = G;
type Failure = <P as Producer>::Failure;
#[inline]
#[throws(ProduceError<Self::Failure>)]
fn produce(&self, good: Self::Good) {
let parts = <<P as Producer>::Good>::strip_from(&good);
for part in parts {
if let Err(error) = self.producer.produce(part) {
throw!(error);
}
}
}
}
#[derive(Debug)]
pub struct StrippingConsumer<C, P> {
consumer: C,
parts: SegQueue<P>,
}
impl<C, P> StrippingConsumer<C, P>
where
C: Consumer,
P: StripFrom<<C as Consumer>::Good>,
{
#[inline]
pub fn new(consumer: C) -> Self {
Self {
consumer,
parts: SegQueue::new(),
}
}
fn strip(&self) -> ConsumeError<<C as Consumer>::Failure> {
let error;
loop {
match self.consumer.consume() {
Ok(composite) => {
for part in P::strip_from(&composite) {
self.parts.push(part);
}
}
Err(e) => {
error = e;
break;
}
}
}
error
}
}
impl<C, P> Consumer for StrippingConsumer<C, P>
where
C: Consumer,
P: StripFrom<<C as Consumer>::Good> + Debug,
{
type Good = P;
type Failure = <C as Consumer>::Failure;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
let error = self.strip();
if let Ok(part) = self.parts.pop() {
part
} else {
throw!(error);
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct NonComposible;
impl<T> From<NonComposible> for ConsumeError<T>
where
T: Error,
{
#[inline]
fn from(_: NonComposible) -> Self {
Self::EmptyStock
}
}
pub trait ComposeFrom<G>
where
Self: Sized,
{
#[throws(NonComposible)]
fn compose_from(parts: &mut Vec<G>) -> Self;
}
#[derive(Debug)]
pub struct ComposingConsumer<C, G>
where
C: Consumer,
<C as Consumer>::Good: Debug,
{
consumer: C,
buffer: RefCell<Vec<<C as Consumer>::Good>>,
#[doc(hidden)]
phantom: PhantomData<G>,
}
impl<C, G> ComposingConsumer<C, G>
where
C: Consumer,
<C as Consumer>::Good: Debug,
{
#[inline]
pub fn new(consumer: C) -> Self {
Self {
consumer,
buffer: RefCell::new(Vec::new()),
phantom: PhantomData,
}
}
}
impl<C, G> Consumer for ComposingConsumer<C, G>
where
C: Consumer,
G: ComposeFrom<<C as Consumer>::Good>,
<C as Consumer>::Good: Debug,
{
type Good = G;
type Failure = <C as Consumer>::Failure;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
let good = self.consumer.consume()?;
let mut buffer = self.buffer.borrow_mut();
buffer.push(good);
G::compose_from(&mut buffer)?
}
}
pub trait Inspector {
type Good;
fn allows(&self, good: &Self::Good) -> bool;
}
#[derive(Debug)]
pub struct VigilantConsumer<C, I> {
consumer: C,
inspector: I,
}
impl<C, I> VigilantConsumer<C, I> {
#[inline]
pub const fn new(consumer: C, inspector: I) -> Self {
Self {
consumer,
inspector,
}
}
}
impl<C, I> Consumer for VigilantConsumer<C, I>
where
C: Consumer,
I: Inspector<Good = <C as Consumer>::Good> + Debug,
{
type Good = <C as Consumer>::Good;
type Failure = <C as Consumer>::Failure;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
let mut input;
loop {
input = self.consumer.consume()?;
if self.inspector.allows(&input) {
break;
}
}
input
}
}
#[derive(Debug)]
pub struct ApprovedProducer<P, I> {
producer: P,
inspector: I,
}
impl<P, I> ApprovedProducer<P, I> {
#[inline]
pub const fn new(producer: P, inspector: I) -> Self {
Self {
producer,
inspector,
}
}
}
impl<P, I> Producer for ApprovedProducer<P, I>
where
P: Producer,
<P as Producer>::Good: Debug + Display,
I: Inspector<Good = <P as Producer>::Good>,
{
type Good = <P as Producer>::Good;
type Failure = <P as Producer>::Failure;
#[inline]
#[throws(ProduceError<Self::Failure>)]
fn produce(&self, good: Self::Good) {
if self.inspector.allows(&good) {
self.producer.produce(good)?
}
}
}
#[derive(Debug)]
pub struct UnlimitedQueue<G> {
queue: SegQueue<G>,
is_closed: AtomicBool,
}
impl<G> UnlimitedQueue<G> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn close(&self) {
self.is_closed.store(true, Ordering::Relaxed);
}
}
impl<G> Consumer for UnlimitedQueue<G>
where
G: Debug,
{
type Good = G;
type Failure = ClosedMarketFailure;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
match self.queue.pop() {
Ok(good) => good,
Err(_) => {
if self.is_closed.load(Ordering::Relaxed) {
throw!(ConsumeError::Failure(ClosedMarketFailure));
} else {
throw!(ConsumeError::EmptyStock);
}
}
}
}
}
impl<G> Default for UnlimitedQueue<G> {
#[inline]
fn default() -> Self {
Self {
queue: SegQueue::new(),
is_closed: AtomicBool::new(false),
}
}
}
impl<G> Producer for UnlimitedQueue<G>
where
G: Debug + Display,
{
type Good = G;
type Failure = ClosedMarketFailure;
#[inline]
#[throws(ProduceError<Self::Failure>)]
fn produce(&self, good: Self::Good) {
if self.is_closed.load(Ordering::Relaxed) {
throw!(ProduceError::Failure(ClosedMarketFailure));
} else {
self.queue.push(good);
}
}
}
#[derive(Debug, Default)]
pub struct PermanentQueue<G> {
queue: SegQueue<G>,
}
impl<G> PermanentQueue<G> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self {
queue: SegQueue::new(),
}
}
}
impl<G> Consumer for PermanentQueue<G>
where
G: Debug,
{
type Good = G;
type Failure = Never;
#[inline]
#[throws(ConsumeError<Self::Failure>)]
fn consume(&self) -> Self::Good {
self.queue.pop().map_err(|_| ConsumeError::EmptyStock)?
}
}
impl<G> Producer for PermanentQueue<G>
where
G: Debug + Display,
{
type Good = G;
type Failure = Never;
#[inline]
#[throws(ProduceError<Self::Failure>)]
fn produce(&self, good: Self::Good) {
self.queue.push(good);
}
}