1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
//! Implements [`Consumer`] for a thread.
//!
//! A thread consists of code that is executed separately. When the code is completed, the status can be consumed.
use {
    crate::{
        sync::{create_delivery, create_lock, Accepter, Deliverer, Trigger},
        ConsumeFailure, Consumer, Producer,
    },
    core::convert::TryFrom,
    fehler::{throw, throws},
    std::{
        any::Any,
        panic::{catch_unwind, AssertUnwindSafe, RefUnwindSafe},
        thread::spawn,
    },
};

/// The type returned by [`catch_unwind()`] when a panic is caught.
type Panic = Box<dyn Any + Send + 'static>;

/// The type returned by a thread call which can represent a success of type `S`, an error of type `E`, or a panic.
#[derive(Debug, parse_display::Display)]
enum Status<S, E> {
    /// The thread call completed sucessfully.
    #[display("{0}")]
    Success(S),
    /// The thread call threw an error.
    #[display("ERROR: {0}")]
    Error(E),
    /// The thread call panicked.
    #[display("PANIC")]
    Panic(Panic),
}

impl<S, E> Status<S, E> {
    /// If `self` represents a success.
    const fn is_success(&self) -> bool {
        matches!(*self, Self::Success(_))
    }
}

/// Describes the kind of thread.
#[derive(Clone, Copy, Debug)]
pub enum Kind {
    /// Runs the call a single time.
    Single,
    /// Will continue running call until cancelled.
    Cancelable,
}

/// A wrapper around the [`std::thread`] functionality.
///
/// Consumes the status of running the given closure. Thus, consumption replaces the functionality of [`std::thread::JoinHandle::join()`].
#[derive(Debug)]
pub struct Thread<S, E> {
    /// Consumes the status of the call.
    consumer: Accepter<Status<S, E>>,
    /// [`Trigger`] to cancel a cancelable thread.
    trigger: Option<Trigger>,
    /// Name of the thread.
    name: String,
}

impl<S: Send + 'static, E: TryFrom<ConsumeFailure<E>> + Send + 'static> Thread<S, E> {
    /// Creates a new [`Thread`] and spawns `call`.
    #[inline]
    pub fn new<
        P: Send + 'static,
        F: FnMut(&mut P) -> Result<S, E> + RefUnwindSafe + Send + 'static,
    >(
        name: String,
        kind: Kind,
        mut parameters: P,
        mut call: F,
    ) -> Self {
        let (producer, consumer) = create_delivery::<Status<S, E>>();

        match kind {
            Kind::Single => {
                let _ = spawn(move || {
                    Self::produce_outcome(Self::run(&mut parameters, &mut call), &producer);
                });

                Self {
                    name,
                    consumer,
                    trigger: None,
                }
            }
            Kind::Cancelable => {
                let (trigger, hammer) = create_lock();
                let _ = spawn(move || {
                    let mut status = Self::run(&mut parameters, &mut call);

                    while hammer.consume().is_err() && status.is_success() {
                        status = Self::run(&mut parameters, &mut call);
                    }

                    Self::produce_outcome(status, &producer);
                });

                Self {
                    name,
                    consumer,
                    trigger: Some(trigger),
                }
            }
        }
    }

    /// Runs `call` and catches any panics.
    fn run<P, F: FnMut(&mut P) -> Result<S, E> + RefUnwindSafe + Send + 'static>(
        mut parameters: &mut P,
        call: &mut F,
    ) -> Status<S, E> {
        match catch_unwind(AssertUnwindSafe(|| (call)(&mut parameters))) {
            Ok(Ok(success)) => Status::Success(success),
            Ok(Err(error)) => Status::Error(error),
            Err(panic) => Status::Panic(panic),
        }
    }

    /// Produces `status` via `producer`.
    fn produce_outcome(status: Status<S, E>, producer: &Deliverer<Status<S, E>>) {
        // Although force is preferable to produce, force requires status impl Clone and the panic value is not bound to impl Clone. Using produce should be fine because produce should never be blocked since this market has a single producer storing a single good.
        #[allow(clippy::unwrap_used)]
        // Passer::produce() can only fail when the stock is full. Since we only call this once, this should never happen.
        producer.produce(status).unwrap();
    }

    /// Requests that `self` be canceled.
    #[inline]
    pub fn cancel(&self) {
        if let Some(trigger) = self.trigger.as_ref() {
            #[allow(clippy::unwrap_used)] // Trigger::produce() cannot fail.
            trigger.produce(()).unwrap();
        }
    }
}

impl<S, E: TryFrom<ConsumeFailure<E>>> Consumer for Thread<S, E> {
    type Good = S;
    type Failure = ConsumeFailure<E>;

    #[allow(clippy::panic_in_result_fn)] // Propogate the panic that occurred in call provided by client.
    #[inline]
    #[throws(Self::Failure)]
    fn consume(&self) -> Self::Good {
        match self.consumer.consume() {
            Ok(status) => match status {
                Status::Success(success) => success,
                Status::Error(error) => throw!(error),
                Status::Panic(panic) => {
                    log::error!("Panic was caught in thread `{}`", self.name);
                    #[allow(clippy::panic)]
                    {
                        // Propogate the panic that occurred in call provided by client.
                        panic!(panic);
                    }
                }
            },
            // Accepter::Failure is FaultlessFailure so a failure means the stock is empty.
            Err(_) => throw!(ConsumeFailure::EmptyStock),
        }
    }
}