#![allow(
clippy::empty_enum,
clippy::implicit_return,
)]
pub mod channel;
use {
core::{
cell::RefCell,
fmt::Debug,
marker::PhantomData,
sync::atomic::{AtomicBool, Ordering},
},
crossbeam_queue::SegQueue,
std::{
io::{self, BufRead, Write},
sync::Mutex,
},
thiserror::Error as ThisError,
};
pub trait Consumer {
type Good;
type Error;
fn consume(&self) -> Result<Option<Self::Good>, Self::Error>;
#[inline]
fn demand(&self) -> Result<Self::Good, Self::Error> {
loop {
if let Some(result) = self.consume().transpose() {
return result;
}
}
}
#[inline]
fn goods(&self) -> GoodsIter<'_, Self>
where
Self: Sized,
{
GoodsIter { consumer: self }
}
}
pub trait Producer {
type Good;
type Error;
fn produce(&self, good: Self::Good) -> Result<Option<Self::Good>, Self::Error>;
#[inline]
fn produce_all(&self, goods: Vec<Self::Good>) -> Result<Vec<Self::Good>, Self::Error> {
let mut new_goods = Vec::new();
for good in goods {
if new_goods.is_empty() {
if let Some(new_good) = self.produce(good)? {
new_goods.push(new_good);
}
} else {
new_goods.push(good);
}
}
Ok(new_goods)
}
#[inline]
fn force(&self, mut good: Self::Good) -> Result<(), Self::Error> {
while let Some(new_good) = self.produce(good)? {
good = new_good;
}
Ok(())
}
}
#[derive(Debug)]
pub struct GoodsIter<'a, C: Consumer> {
consumer: &'a C,
}
impl<C: Consumer> Iterator for GoodsIter<'_, C> {
type Item = <C as Consumer>::Good;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.consumer.demand().ok()
}
}
pub trait StripFrom<G>
where
Self: Sized,
{
fn strip_from(good: &G) -> Vec<Self>;
}
#[derive(Debug, ThisError)]
pub enum StripError<T>
where
T: Debug,
{
#[error("")]
Lock,
#[error("")]
Error(T),
}
#[derive(Debug)]
pub struct StrippingProducer<G, P>
where
P: Producer,
<P as Producer>::Good: Debug,
{
#[doc(hidden)]
phantom: PhantomData<G>,
producer: P,
parts: Mutex<Vec<<P as Producer>::Good>>,
}
impl<G, P> StrippingProducer<G, P>
where
P: Producer,
<P as Producer>::Good: Debug,
{
#[inline]
pub fn new(producer: P) -> Self {
Self {
producer,
phantom: PhantomData,
parts: Mutex::new(Vec::new()),
}
}
}
impl<G, P> Producer for StrippingProducer<G, P>
where
P: Producer,
<P as Producer>::Good: StripFrom<G> + Clone + Debug,
<P as Producer>::Error: Debug,
{
type Good = G;
type Error = StripError<<P as Producer>::Error>;
#[inline]
fn produce(&self, good: Self::Good) -> Result<Option<Self::Good>, Self::Error> {
let mut parts = self.parts.lock().map_err(|_| Self::Error::Lock)?;
if parts.is_empty() {
*parts = <P as Producer>::Good::strip_from(&good);
}
*parts = self
.producer
.produce_all(parts.to_vec())
.map_err(Self::Error::Error)?;
Ok(if parts.is_empty() { None } else { Some(good) })
}
}
#[derive(Debug)]
pub struct StrippingConsumer<C, P> {
consumer: C,
parts: SegQueue<P>,
}
impl<C, P> StrippingConsumer<C, P>
where
C: Consumer,
P: StripFrom<<C as Consumer>::Good>,
{
#[inline]
pub fn new(consumer: C) -> Self {
Self {
consumer,
parts: SegQueue::new(),
}
}
fn strip(&self) -> Result<(), <C as Consumer>::Error> {
while let Some(composite) = self.consumer.consume()? {
for part in P::strip_from(&composite) {
self.parts.push(part);
}
}
Ok(())
}
}
impl<C, P> Consumer for StrippingConsumer<C, P>
where
C: Consumer,
P: StripFrom<<C as Consumer>::Good>,
{
type Good = P;
type Error = <C as Consumer>::Error;
#[inline]
fn consume(&self) -> Result<Option<Self::Good>, Self::Error> {
let strip_result = self.strip();
if let Ok(part) = self.parts.pop() {
Ok(Some(part))
} else if let Err(error) = strip_result {
Err(error)
} else {
Ok(None)
}
}
}
pub trait ComposeFrom<G>
where
Self: Sized,
{
fn compose_from(parts: &mut Vec<G>) -> Option<Self>;
}
pub trait Inspector {
type Good;
fn allows(&self, good: &Self::Good) -> bool;
}
#[derive(Debug)]
pub struct VigilantConsumer<C, I> {
consumer: C,
inspector: I,
}
impl<C, I> VigilantConsumer<C, I> {
#[inline]
pub const fn new(consumer: C, inspector: I) -> Self {
Self {
consumer,
inspector,
}
}
}
impl<C, I> Consumer for VigilantConsumer<C, I>
where
C: Consumer,
I: Inspector<Good = <C as Consumer>::Good>,
{
type Good = <C as Consumer>::Good;
type Error = <C as Consumer>::Error;
#[inline]
fn consume(&self) -> Result<Option<Self::Good>, Self::Error> {
while let Some(input) = self.consumer.consume()? {
if self.inspector.allows(&input) {
return Ok(Some(input));
}
}
Ok(None)
}
}
#[derive(Debug)]
pub struct ApprovedProducer<P, I> {
producer: P,
inspector: I,
}
impl<P, I> ApprovedProducer<P, I> {
#[inline]
pub const fn new(producer: P, inspector: I) -> Self {
Self {
producer,
inspector,
}
}
}
impl<P, I> Producer for ApprovedProducer<P, I>
where
P: Producer,
I: Inspector<Good = <P as Producer>::Good>,
{
type Good = <P as Producer>::Good;
type Error = <P as Producer>::Error;
#[inline]
fn produce(&self, good: Self::Good) -> Result<Option<Self::Good>, Self::Error> {
if self.inspector.allows(&good) {
self.producer.produce(good)
} else {
Ok(None)
}
}
}
#[derive(Debug)]
pub struct ByteWriter<W> {
writer: RefCell<W>,
}
impl<W> ByteWriter<W> {
#[inline]
pub const fn new(writer: W) -> Self {
Self {
writer: RefCell::new(writer),
}
}
}
impl<W> Producer for ByteWriter<W>
where
W: Write,
{
type Good = u8;
type Error = io::Error;
#[inline]
fn produce(&self, good: Self::Good) -> Result<Option<Self::Good>, Self::Error> {
Ok(if self.writer.borrow_mut().write(&[good])? == 0 {
Some(good)
} else {
None
})
}
#[inline]
fn produce_all(&self, mut goods: Vec<Self::Good>) -> Result<Vec<Self::Good>, Self::Error> {
#[allow(unused_results)]
{
goods.drain(..self.writer.borrow_mut().write(&goods)?);
}
Ok(goods)
}
}
#[derive(Debug)]
pub struct Reader<G, R> {
#[doc(hidden)]
phantom: PhantomData<G>,
reader: RefCell<R>,
buffer: RefCell<Vec<u8>>,
}
impl<G, R> Reader<G, R> {
#[inline]
pub fn new(reader: R) -> Self {
Self {
buffer: RefCell::new(Vec::new()),
reader: RefCell::new(reader),
phantom: PhantomData,
}
}
}
impl<G, R> Consumer for Reader<G, R>
where
G: ComposeFrom<u8>,
R: BufRead,
{
type Good = G;
type Error = io::Error;
#[inline]
fn consume(&self) -> Result<Option<Self::Good>, Self::Error> {
let mut reader = self.reader.borrow_mut();
let buf = reader.fill_buf()?;
let consumed_len = buf.len();
let mut buffer = self.buffer.borrow_mut();
buffer.extend_from_slice(buf);
reader.consume(consumed_len);
Ok(G::compose_from(&mut buffer))
}
}
#[derive(Copy, Clone, Debug, ThisError)]
pub enum NeverErr {}
#[derive(Clone, Copy, Debug, ThisError)]
#[error("market is closed")]
pub struct ClosedMarketError;
#[derive(Debug)]
pub struct UnlimitedQueue<G> {
queue: SegQueue<G>,
is_closed: AtomicBool,
}
impl<G> UnlimitedQueue<G> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn close(&self) {
self.is_closed.store(true, Ordering::Relaxed);
}
}
impl<G> Consumer for UnlimitedQueue<G> {
type Good = G;
type Error = ClosedMarketError;
#[inline]
fn consume(&self) -> Result<Option<Self::Good>, Self::Error> {
match self.queue.pop() {
Ok(good) => Ok(Some(good)),
Err(_) => {
if self.is_closed.load(Ordering::Relaxed) {
Err(ClosedMarketError)
} else {
Ok(None)
}
}
}
}
}
impl<G> Default for UnlimitedQueue<G> {
#[inline]
fn default() -> Self {
Self {
queue: SegQueue::new(),
is_closed: AtomicBool::new(false),
}
}
}
impl<G> Producer for UnlimitedQueue<G> {
type Good = G;
type Error = ClosedMarketError;
#[inline]
fn produce(&self, good: Self::Good) -> Result<Option<Self::Good>, Self::Error> {
if self.is_closed.load(Ordering::Relaxed) {
Err(ClosedMarketError)
} else {
self.queue.push(good);
Ok(None)
}
}
}
#[derive(Debug, Default)]
pub struct PermanentQueue<G> {
queue: SegQueue<G>,
}
impl<G> PermanentQueue<G> {
#[must_use]
#[inline]
pub fn new() -> Self {
Self {
queue: SegQueue::new(),
}
}
}
impl<G> Consumer for PermanentQueue<G> {
type Good = G;
type Error = NeverErr;
#[inline]
fn consume(&self) -> Result<Option<Self::Good>, Self::Error> {
Ok(self.queue.pop().ok())
}
}
impl<G> Producer for PermanentQueue<G> {
type Good = G;
type Error = NeverErr;
#[inline]
fn produce(&self, good: Self::Good) -> Result<Option<Self::Good>, Self::Error> {
self.queue.push(good);
Ok(None)
}
}