#![warn(missing_docs)]
use std::{
panic,
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex, MutexGuard,
},
};
use internal_context::InternalData;
pub struct Executor<T> {
state: Arc<Mutex<InternalData<T>>>,
}
impl<T> Executor<T>
where
T: Send + Sync + 'static,
{
pub fn new<E, F>(e: E, f: F) -> Self
where
F: FnOnce() -> T + Send + 'static,
E: Context,
{
let state = Arc::new(Mutex::new(InternalData { res: None }));
e.execute(state.clone(), f);
Self { state }
}
fn lock_state(&self) -> MutexGuard<'_, InternalData<T>> {
match self.state.lock() {
Ok(g) => g,
Err(e) => {
panic!("Cannot lock internal state of Executor. This means that the lock is poisoned, and *that*, in turn, means that the thread has paniced. Lock result: {}",e);
}
}
}
pub fn is_done(&self) -> bool {
self.lock_state().res.is_some()
}
pub fn result(&self) -> Option<Arc<T>> {
self.lock_state().res.clone()
}
pub fn take_result(self) -> Option<T> {
match Arc::into_inner(self.state) {
None => None,
Some(mtx) => {
let state = mtx
.into_inner()
.expect("Executor state was poisoned, probably due to panic in work thread.");
state.res.map(|x| Arc::into_inner(x)).unwrap_or(None)
}
}
}
}
pub struct StatusExecutor<T, S> {
state: Arc<Mutex<InternalData<T>>>,
rx: Receiver<S>,
last_status: Mutex<Option<S>>,
}
impl<T, S> StatusExecutor<T, S>
where
T: Send + Sync + 'static,
S: Send + Clone + 'static,
{
pub fn new<E, F>(e: E, f: F) -> Self
where
F: FnOnce(StatusSender<S>) -> T + Send + 'static,
E: Context,
{
let state = Arc::new(Mutex::new(InternalData { res: None }));
let (tx, rx) = channel();
let sender = StatusSender { tx };
e.execute(state.clone(), move || f(sender));
Self {
state,
rx,
last_status: Mutex::new(None),
}
}
fn lock_state(&self) -> MutexGuard<'_, InternalData<T>> {
self.state.lock().expect("Cannot lock internal state of StatusExecutor. This means that the lock is poisoned, and *that*, in turn, means that the thread has panicked.")
}
fn lock_last_status(&self) -> MutexGuard<'_, Option<S>> {
self.last_status.lock().expect("Cannot lock internal last status of StatusExecutor. This means that the lock is poisoned, and *that*, in turn, means that the only owning thread has panicked by this point. I am unsure how one could get to this point.")
}
pub fn is_done(&self) -> bool {
self.lock_state().res.is_some()
}
pub fn result(&self) -> Option<Arc<T>> {
self.lock_state().res.clone()
}
pub fn take_result(self) -> Option<T> {
match Arc::into_inner(self.state) {
None => None,
Some(mtx) => {
let state = mtx.into_inner().expect(
"StatusExecutor state was poisoned, probably due to panic in work thread.",
);
state.res.map(|x| Arc::into_inner(x)).unwrap_or(None)
}
}
}
pub fn status(&self) -> Option<S> {
match self.rx.try_recv() {
Ok(s) => {
*self.lock_last_status() = Some(s.clone());
Some(s)
}
_ => None,
}
}
pub fn latest_status(&self) -> Option<S> {
let latest = self.rx.try_iter().last();
match latest {
Some(s) => {
*self.lock_last_status() = Some(s.clone());
Some(s)
}
None => self.lock_last_status().clone(),
}
}
}
pub struct StatusSender<S> {
tx: Sender<S>,
}
impl<S> StatusSender<S>
where
S: Send,
{
pub fn send(&self, s: S) -> bool {
self.tx.send(s).is_ok()
}
}
mod internal_context {
use std::sync::{Arc, Mutex};
pub struct InternalData<T> {
pub res: Option<Arc<T>>,
}
pub trait InternalContext {
fn execute<T, F>(&self, state: Arc<Mutex<InternalData<T>>>, f: F)
where
T: Send + Sync + 'static,
F: FnOnce() -> T + Send + 'static;
}
}
pub trait Context: internal_context::InternalContext {}
#[derive(Debug, Default)]
pub struct StdContext {
_tag: (),
}
impl Context for StdContext {}
impl internal_context::InternalContext for StdContext {
fn execute<T, F>(&self, state: Arc<Mutex<InternalData<T>>>, f: F)
where
T: Send + Sync + 'static,
F: FnOnce() -> T + Send + 'static,
{
let res = panic::catch_unwind(panic::AssertUnwindSafe(move || {
std::thread::spawn(move || {
let r = f();
state
.lock()
.expect("main thread panicked for this executor")
.res = Some(Arc::new(r));
});
}));
if res.is_err() {
std::process::abort();
}
}
}
#[cfg(feature = "rayon")]
pub mod rayon_context {
use std::sync::{Arc, Mutex};
use rayon::ThreadPool;
use crate::internal_context::{InternalContext, InternalData};
#[derive(Debug, Default)]
pub struct RayonGlobalContext {
_tag: (),
}
impl InternalContext for RayonGlobalContext {
fn execute<T, F>(&self, state: Arc<Mutex<InternalData<T>>>, f: F)
where
T: Send + Sync + 'static,
F: FnOnce() -> T + Send + 'static,
{
rayon::spawn(move || {
let r = f();
state
.lock()
.expect("main thread panicked for this executor")
.res = Some(Arc::new(r));
});
}
}
impl crate::Context for RayonGlobalContext {}
#[derive(Debug)]
pub struct RayonContext<'a> {
pool: &'a ThreadPool,
}
impl<'a> RayonContext<'a> {
pub fn new(pool: &'a ThreadPool) -> Self {
Self { pool }
}
}
impl InternalContext for RayonContext<'_> {
fn execute<T, F>(&self, state: Arc<Mutex<InternalData<T>>>, f: F)
where
T: Send + Sync + 'static,
F: FnOnce() -> T + Send + 'static,
{
self.pool.spawn(move || {
let r = f();
state
.lock()
.expect("main thread panicked for this executor")
.res = Some(Arc::new(r));
});
}
}
impl crate::Context for RayonContext<'_> {}
}
#[cfg(feature = "rayon")]
pub use rayon_context::*;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn executor_std() {
let e = Executor::new(StdContext::default(), move || 1234);
while !e.is_done() {
std::thread::yield_now();
}
assert_eq!(e.result().map(|x| *x), Some(1234));
assert_eq!(e.take_result(), Some(1234));
}
#[test]
fn status_executor_std() {
let e = StatusExecutor::new(StdContext::default(), move |s| {
s.send(432);
s.send(999);
s.send(4);
1234
});
while !e.is_done() {
std::thread::yield_now();
}
assert_eq!(e.status(), Some(432));
assert_eq!(e.latest_status(), Some(4));
assert_eq!(e.result().map(|x| *x), Some(1234));
assert_eq!(e.take_result(), Some(1234));
}
}