1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
//! Infrastructure for producers and consumers in a market.
//!
//! A market is a stock of goods that can have agents act upon it. An agent can be either a [`Producer`] that stores goods into the market or a [`Consumer`] that retrieves goods from the market. The important thing to note about agents is that they are immutable during their respective actions.

// Use of market in derive macros requires defining crate as market.
extern crate self as market;

pub mod channel;
mod error;
pub mod io;
mod map;
pub mod process;
pub mod queue;
pub mod sync;
pub mod thread;
pub mod vec;

pub use {
    error::{ConsumeFailure, Failure, InsufficientStockFailure, ProduceFailure},
    market_derive::{ConsumeFault, ProduceFault},
};

use {
    core::fmt::Debug,
    fehler::{throw, throws},
};

/// Specifies the storage of goods into a market.
#[allow(clippy::missing_inline_in_public_items)] // current issue with fehler for produce(); see https://github.com/withoutboats/fehler/issues/39
pub trait Producer {
    /// The item being produced.
    type Good;
    /// Describes a failure to successfully complete production.
    type Failure: Failure;

    /// Stores `good` in the market without blocking.
    ///
    /// SHALL only run in the calling process and return without blocking.
    ///
    /// # Errors
    ///
    /// If fault `T` is caught, SHALL throw [`Self::Failure`] `F` such that `F.fault() == Some(T)`. If `self` cannot store `good` without blocking, SHALL throw an appropriate [`Self::Failure`].
    #[allow(redundant_semicolons, unused_variables)] // current issue with fehler; see https://github.com/withoutboats/fehler/issues/39
    #[throws(Self::Failure)]
    fn produce(&self, good: Self::Good);

    /// Stores each good in `goods` in the market without blocking.
    ///
    /// # Errors
    ///
    /// If [`Failure`] `F` is caught, SHALL attempt no further goods and throw `F`.
    #[throws(Self::Failure)]
    fn produce_all<I: IntoIterator<Item = Self::Good>>(&self, goods: I)
    where
        Self: Sized,
    {
        for good in goods {
            self.produce(good)?;
        }
    }

    /// Stores `good` in the market, blocking until space is available.
    ///
    /// # Errors
    ///
    /// If fault `T` is caught, SHALL throw `T`
    #[inline]
    #[throws(<Self::Failure as Failure>::Fault)]
    fn force(&self, good: Self::Good)
    where
        Self::Good: Clone,
    {
        while let Err(failure) = self.produce(good.clone()) {
            if let Some(fault) = failure.fault() {
                throw!(fault)
            }
        }
    }

    /// Stores every good in `goods`, blocking until space is available.
    ///
    /// # Errors
    ///
    /// If fault `T` is caught, SHALL attempt no further goods and throw `T`.
    #[throws(<Self::Failure as Failure>::Fault)]
    fn force_all<I: IntoIterator<Item = Self::Good>>(&self, goods: I)
    where
        Self: Sized,
        Self::Good: Clone,
    {
        for good in goods {
            self.force(good)?
        }
    }
}

/// Retrieves goods from a market.
///
/// The order in which goods are retrieved is defined by the implementer.
pub trait Consumer {
    /// The item being consumed.
    type Good;
    /// Describes a failure to successfully complete consumption.
    type Failure: Failure;

    /// Retrieves the next good from the market without blocking.
    ///
    /// SHALL only run in the calling process and return the next good in the market without blocking.
    ///
    /// # Errors
    ///
    /// If fault `T` is caught, SHALL throw [`Self::Failure`] `F` such that `F.fault() == Some(T)`. If `self` cannot retrieve a good without blocking, SHALL throw an appropriate [`Self::Failure`].
    #[throws(Self::Failure)]
    fn consume(&self) -> Self::Good;

    /// Returns a [`Goods`] of `self`.
    #[inline]
    fn goods(&self) -> Goods<'_, Self>
    where
        Self: Sized,
    {
        Goods { consumer: self }
    }

    /// Retrieves the next good from the market, blocking until one is available.
    ///
    /// # Errors
    ///
    /// If fault `T` is caught, SHALL throw `T`.
    #[inline]
    #[throws(<Self::Failure as Failure>::Fault)]
    fn demand(&self) -> Self::Good {
        loop {
            match self.consume() {
                Ok(good) => {
                    break good;
                }
                Err(failure) => {
                    if let Some(fault) = failure.fault() {
                        throw!(fault);
                    }
                }
            }
        }
    }
}

/// An [`Iterator`] of the consumptions of a [`Consumer`].
#[derive(Debug)]
pub struct Goods<'a, C: Consumer> {
    /// The [`Consumer`].
    consumer: &'a C,
}

impl<C: Consumer> Iterator for Goods<'_, C> {
    type Item = Result<<C as Consumer>::Good, <<C as Consumer>::Failure as Failure>::Fault>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        match self.consumer.consume() {
            Ok(good) => Some(Ok(good)),
            Err(failure) => failure.fault().map(Err),
        }
    }
}