market_types 0.1.0

Common implementations of a market
//! Implements [`Producer`] and [`Consumer`] for a thread.
use {
    crate::queue_crossbeam::CrossbeamInfiniteQueue,
    alloc::{boxed::Box, format, string::String, sync::Arc},
    core::{
        any::Any,
        fmt::{self, Display, Formatter},
        sync::atomic::{AtomicBool, Ordering},
    },
    fehler::{throw, throws},
    market::{
        queue::InfiniteQueue, Agent, Consumer, ConsumptionFlaws, EmptyStock, Failure, Fault,
        Producer,
    },
    std::{
        panic::{self, AssertUnwindSafe, RefUnwindSafe},
        thread,
    },
};

/// The type returned by [`std::panic::catch_unwind()`] when a panic is caught.
type Panic = Box<dyn Any + Send + 'static>;

/// The result of a thread once it has finished.
#[derive(Debug)]
#[non_exhaustive]
pub enum Outcome<A, E> {
    /// The thread return an answer.
    Answer(A),
    /// The thread threw an error.
    Error(E),
    /// The thread panicked.
    Panic(Panic),
}

/// A [`Consumer`] of the [`Outcome`] generated by a thread.
#[derive(Debug)]
pub struct Thread<A, E> {
    /// The name of the thread.
    name: String,
    /// Stores the transfer of the [`Outcome`] from the thread.
    ///
    /// Uses an infinite queue so that production cannot fail.
    outcome_queue: Arc<CrossbeamInfiniteQueue<Outcome<A, E>>>,
}

impl<A: Send + 'static, E: Send + 'static> Thread<A, E> {
    /// Creates a new [`Thread`] to run `call` with `parameters`..
    pub fn new<
        P: Send + 'static,
        F: FnMut(&mut P) -> Result<A, E> + RefUnwindSafe + Send + 'static,
        S,
    >(
        mut parameters: P,
        mut call: F,
        name_str: &S,
    ) -> Self
    where
        S: AsRef<str> + ?Sized,
    {
        let outcome_queue = Arc::new(CrossbeamInfiniteQueue::allocate(&format!(
            "outcome queue for thread `{}`",
            name_str.as_ref()
        )));
        let queue_clone = Arc::clone(&outcome_queue);

        // Drop JoinHandle because Thread will not attempt to join.
        drop(thread::spawn(move || {
            queue_clone.produce(Self::run(&mut parameters, &mut call))
        }));

        Self {
            name: String::from(name_str.as_ref()),
            outcome_queue,
        }
    }

    /// Runs `call` with `parameters` to completion and returns its [`Outcome`].
    fn run<P, F: FnMut(&mut P) -> Result<A, E> + RefUnwindSafe + Send + 'static>(
        mut parameters: &mut P,
        call: &mut F,
    ) -> Outcome<A, E> {
        match panic::catch_unwind(AssertUnwindSafe(|| (call)(&mut parameters))) {
            Ok(Ok(answer)) => Outcome::Answer(answer),
            Ok(Err(error)) => Outcome::Error(error),
            Err(panic) => Outcome::Panic(panic),
        }
    }
}

impl<A, E> Agent for Thread<A, E> {
    type Good = A;
}

impl<A, E> Consumer for Thread<A, E> {
    type Flaws = ConsumptionFlaws<E>;

    #[allow(clippy::panic_in_result_fn)] // Propogate the panic that occurred in call provided by client.
    #[throws(Failure<Self::Flaws>)]
    fn consume(&self) -> Self::Good {
        // Must map failure instead of using blame because unable to satisfy E: From<Infallible>.
        match self
            .outcome_queue
            .consume()
            .map_err(|_failure| self.failure(Fault::Insufficiency(EmptyStock::default())))?
        {
            Outcome::Answer(answer) => answer,
            Outcome::Error(error) => throw!(self.failure(Fault::Defect(error))),
            Outcome::Panic(panic) => {
                log::error!("Panic was caught in thread `{}`", self.name);
                #[allow(clippy::panic)]
                {
                    // Propogate the panic that occurred in call provided by client.
                    panic::panic_any(panic);
                }
            }
        }
    }
}

impl<A, E> Display for Thread<A, E> {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.name)
    }
}

/// Repeats a defined function until cancelled or an error occurs.
pub trait Repeater {
    /// The paramters input to the function.
    type Parameters;
    /// The answer thrown by the function.
    type Answer;
    /// The error thrown by the function.
    type Error;

    /// The function that is repeated by `Self`.
    ///
    /// # Errors
    ///
    /// Throws [`Self::Error`] if an error occurs within the function.
    fn single_call(&self, parameters: &Self::Parameters) -> Result<Self::Answer, Self::Error>;

    /// The call provided to [`Thread::new()`].
    #[throws(Self::Error)]
    fn call(&self, parameters: RepeaterParams<Self::Parameters>) -> Self::Answer {
        let mut answer = self.single_call(&parameters.params)?;

        while !parameters.will_cancel.load(Ordering::Acquire) {
            answer = self.single_call(&parameters.params)?;
        }

        answer
    }
}

/// Parameters for a [`Repeater`].
#[derive(Debug)]
pub struct RepeaterParams<P> {
    /// If the repeater will be cancel its next call.
    will_cancel: Arc<AtomicBool>,
    /// The parameters given to each call.
    params: P,
}