use {
crate::queue_crossbeam::CrossbeamInfiniteQueue,
alloc::{boxed::Box, format, string::String, sync::Arc},
core::{
any::Any,
fmt::{self, Display, Formatter},
sync::atomic::{AtomicBool, Ordering},
},
fehler::{throw, throws},
market::{
queue::InfiniteQueue, Agent, Consumer, ConsumptionFlaws, EmptyStock, Failure, Fault,
Producer,
},
std::{
panic::{self, AssertUnwindSafe, RefUnwindSafe},
thread,
},
};
type Panic = Box<dyn Any + Send + 'static>;
#[derive(Debug)]
#[non_exhaustive]
pub enum Outcome<A, E> {
Answer(A),
Error(E),
Panic(Panic),
}
#[derive(Debug)]
pub struct Thread<A, E> {
name: String,
outcome_queue: Arc<CrossbeamInfiniteQueue<Outcome<A, E>>>,
}
impl<A: Send + 'static, E: Send + 'static> Thread<A, E> {
pub fn new<
P: Send + 'static,
F: FnMut(&mut P) -> Result<A, E> + RefUnwindSafe + Send + 'static,
S,
>(
mut parameters: P,
mut call: F,
name_str: &S,
) -> Self
where
S: AsRef<str> + ?Sized,
{
let outcome_queue = Arc::new(CrossbeamInfiniteQueue::allocate(&format!(
"outcome queue for thread `{}`",
name_str.as_ref()
)));
let queue_clone = Arc::clone(&outcome_queue);
drop(thread::spawn(move || {
queue_clone.produce(Self::run(&mut parameters, &mut call))
}));
Self {
name: String::from(name_str.as_ref()),
outcome_queue,
}
}
fn run<P, F: FnMut(&mut P) -> Result<A, E> + RefUnwindSafe + Send + 'static>(
mut parameters: &mut P,
call: &mut F,
) -> Outcome<A, E> {
match panic::catch_unwind(AssertUnwindSafe(|| (call)(&mut parameters))) {
Ok(Ok(answer)) => Outcome::Answer(answer),
Ok(Err(error)) => Outcome::Error(error),
Err(panic) => Outcome::Panic(panic),
}
}
}
impl<A, E> Agent for Thread<A, E> {
type Good = A;
}
impl<A, E> Consumer for Thread<A, E> {
type Flaws = ConsumptionFlaws<E>;
#[allow(clippy::panic_in_result_fn)] #[throws(Failure<Self::Flaws>)]
fn consume(&self) -> Self::Good {
match self
.outcome_queue
.consume()
.map_err(|_failure| self.failure(Fault::Insufficiency(EmptyStock::default())))?
{
Outcome::Answer(answer) => answer,
Outcome::Error(error) => throw!(self.failure(Fault::Defect(error))),
Outcome::Panic(panic) => {
log::error!("Panic was caught in thread `{}`", self.name);
#[allow(clippy::panic)]
{
panic::panic_any(panic);
}
}
}
}
}
impl<A, E> Display for Thread<A, E> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name)
}
}
pub trait Repeater {
type Parameters;
type Answer;
type Error;
fn single_call(&self, parameters: &Self::Parameters) -> Result<Self::Answer, Self::Error>;
#[throws(Self::Error)]
fn call(&self, parameters: RepeaterParams<Self::Parameters>) -> Self::Answer {
let mut answer = self.single_call(¶meters.params)?;
while !parameters.will_cancel.load(Ordering::Acquire) {
answer = self.single_call(¶meters.params)?;
}
answer
}
}
#[derive(Debug)]
pub struct RepeaterParams<P> {
will_cancel: Arc<AtomicBool>,
params: P,
}