use std::cell::RefCell;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::time::Duration;
use crossbeam_channel::Receiver;
use crossbeam_channel::RecvTimeoutError;
use crossbeam_channel::Select;
use crossbeam_channel::SelectedOperation;
use crossbeam_channel::Sender;
use crate::registry::deregister_thread;
use crate::registry::register_thread;
use crate::status::RegisteredStatus;
use crate::ErrorKind;
use crate::Result;
mod map;
pub use self::map::MapThread;
pub struct ThreadScopeActivityGuard {
activity: Arc<Mutex<Option<String>>>,
current: Option<String>,
}
impl Drop for ThreadScopeActivityGuard {
fn drop(&mut self) {
let mut guard = self
.activity
.lock()
.expect("ThreadScopeActivityGuard::activity lock poisoned");
*guard = self.current.take();
}
}
pub struct Thread<T: Send + 'static> {
join: RefCell<Option<JoinHandle<T>>>,
join_check: Receiver<()>,
shutdown: Arc<AtomicBool>,
}
impl<T: Send + 'static> Thread<T> {
pub(crate) fn new(
join: JoinHandle<T>,
join_check: Receiver<()>,
shutdown: Arc<AtomicBool>,
) -> Thread<T> {
let join = RefCell::new(Some(join));
Thread {
join,
join_check,
shutdown,
}
}
pub fn join(&self) -> Result<T> {
let handle = self
.join
.try_borrow_mut()
.map_err(|_| ErrorKind::JoinedAlready)?
.take();
if handle.is_none() {
return Err(ErrorKind::JoinedAlready.into());
}
handle
.expect("the handle should be Some here")
.join()
.map_err(|error| ErrorKind::Join(Mutex::new(error)).into())
}
pub fn join_timeout(&self, timeout: Duration) -> Result<T> {
match self.join_check.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => Err(ErrorKind::JoinTimeout.into()),
_ => self.join(),
}
}
pub fn map<U, F>(self, mut f: F) -> MapThread<U>
where
U: Send + 'static,
F: FnMut(T) -> U + 'static,
{
let mut join = self.join.into_inner();
let join = move || {
let join = match join.take() {
Some(join) => join,
None => return Err(ErrorKind::JoinedAlready.into()),
};
#[allow(clippy::redundant_closure)]
join.join()
.map_err(|error| ErrorKind::Join(Mutex::new(error)).into())
.map(|r| f(r))
};
MapThread::new(join, self.join_check, self.shutdown)
}
pub fn request_shutdown(&self) {
self.shutdown.store(true, Ordering::Relaxed);
}
pub fn select_add<'a>(&'a self, select: &mut Select<'a>) -> usize {
select.recv(&self.join_check)
}
pub fn select_join(&self, operation: SelectedOperation) -> Result<T> {
let _ = operation.recv(&self.join_check);
self.join()
}
}
pub struct ThreadScope {
activity: Arc<Mutex<Option<String>>>,
shutdown: Arc<AtomicBool>,
}
impl ThreadScope {
pub(crate) fn new(
activity: Arc<Mutex<Option<String>>>,
shutdown: Arc<AtomicBool>,
) -> ThreadScope {
ThreadScope { activity, shutdown }
}
pub fn activity<S: Into<String>>(&self, activity: S) {
let activity = activity.into();
let mut guard = self
.activity
.lock()
.expect("ThreadScope::activity lock poisoned");
*guard = Some(activity);
}
pub fn idle(&self) {
let mut guard = self
.activity
.lock()
.expect("ThreadScope::activity lock poisoned");
*guard = None;
}
pub fn scoped_activity<S: Into<String>>(&self, activity: S) -> ThreadScopeActivityGuard {
let activity = activity.into();
let mut guard = self
.activity
.lock()
.expect("ThreadScope::activity lock poisoned");
let current: Option<String> = guard.clone();
*guard = Some(activity);
drop(guard);
let activity = Arc::clone(&self.activity);
ThreadScopeActivityGuard { activity, current }
}
pub fn should_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Relaxed)
}
}
pub(crate) struct ThreadGuard {
id: u64,
join_check: Sender<()>,
}
impl ThreadGuard {
pub(crate) fn new(id: u64, join_check: Sender<()>, status: RegisteredStatus) -> ThreadGuard {
register_thread(id, status);
ThreadGuard { id, join_check }
}
}
impl Drop for ThreadGuard {
fn drop(&mut self) {
let _ = self.join_check.try_send(());
deregister_thread(self.id);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crossbeam_channel::Select;
use super::super::registered_threads;
use super::super::Builder;
#[test]
fn activity() {
let thread = Builder::new("activity")
.spawn(|scope| {
scope.activity("testing activity API");
loop {
::std::thread::sleep(Duration::from_millis(10));
if scope.should_shutdown() {
break;
}
}
})
.expect("to spawn test thread");
::std::thread::sleep(::std::time::Duration::from_millis(10));
let threads = registered_threads();
thread.request_shutdown();
thread.join().expect("the thread to stop");
let thread = threads
.into_iter()
.find(|t| t.name == "activity")
.expect("test thread not found");
assert_eq!(Some("testing activity API".into()), thread.activity);
}
#[test]
fn idle() {
let thread = Builder::new("idle")
.spawn(|scope| {
scope.activity("testing activity API");
scope.idle();
loop {
::std::thread::sleep(Duration::from_millis(10));
if scope.should_shutdown() {
break;
}
}
})
.expect("to spawn test thread");
::std::thread::sleep(::std::time::Duration::from_millis(10));
let threads = registered_threads();
thread.request_shutdown();
thread.join().expect("the thread to stop");
let thread = threads
.into_iter()
.find(|t| t.name == "idle")
.expect("test thread not found");
assert_eq!(None, thread.activity);
}
#[test]
fn join_timeout() {
let thread = Builder::new("request_shutdown")
.spawn(|scope| loop {
::std::thread::sleep(Duration::from_millis(10));
if scope.should_shutdown() {
break;
}
})
.expect("to spawn test thread");
thread.request_shutdown();
thread
.join_timeout(Duration::from_millis(15))
.expect("the thread to stop");
}
#[test]
fn request_shutdown() {
let thread = Builder::new("request_shutdown")
.spawn(|scope| loop {
::std::thread::sleep(Duration::from_millis(10));
if scope.should_shutdown() {
break;
}
})
.expect("to spawn test thread");
thread.request_shutdown();
thread.join().expect("the thread to stop");
}
#[test]
fn scoped_activity() {
let (notifier, notifiction) = ::crossbeam_channel::bounded(0);
let thread = Builder::new("scoped_activity")
.spawn(move |scope| {
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed to scope1");
{
let scope1 = scope.scoped_activity("scope1");
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed to scope2");
let scope2 = scope.scoped_activity("scope2");
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed out of scope2");
drop(scope2);
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed out of scope1");
drop(scope1);
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed to scope3");
}
let scope3 = scope.scoped_activity("scope3");
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed out of scope3");
drop(scope3);
notifiction
.recv_timeout(Duration::from_millis(50))
.expect("proceed to thread exit");
})
.expect("to spawn test thread");
::std::thread::sleep(Duration::from_millis(10));
let start = registered_threads();
notifier.send(()).expect("proceed to scope1");
::std::thread::sleep(Duration::from_millis(10));
let scope1_in = registered_threads();
notifier.send(()).expect("proceed to scope2");
::std::thread::sleep(Duration::from_millis(10));
let scope2_in = registered_threads();
notifier.send(()).expect("proceed out of scope2");
::std::thread::sleep(Duration::from_millis(10));
let scope2_out = registered_threads();
notifier.send(()).expect("proceed out of scope1");
::std::thread::sleep(Duration::from_millis(10));
let scope1_out = registered_threads();
notifier.send(()).expect("proceed to scope3");
::std::thread::sleep(Duration::from_millis(10));
let scope3_in = registered_threads();
notifier.send(()).expect("proceed out of scope3");
::std::thread::sleep(Duration::from_millis(10));
let scope3_out = registered_threads();
notifier.send(()).expect("proceed to thread exit");
thread.request_shutdown();
thread.join().expect("the thread to stop");
let status = start
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(None, status.activity);
let status = scope1_in
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(Some("scope1".into()), status.activity);
let status = scope2_in
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(Some("scope2".into()), status.activity);
let status = scope2_out
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(Some("scope1".into()), status.activity);
let status = scope1_out
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(None, status.activity);
let status = scope3_in
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(Some("scope3".into()), status.activity);
let status = scope3_out
.into_iter()
.find(|t| t.name == "scoped_activity")
.expect("test thread not found");
assert_eq!(None, status.activity);
}
#[test]
fn select_interface() {
let thread = Builder::new("select_interface")
.spawn(|_| {
::std::thread::sleep(Duration::from_millis(10));
})
.expect("to spawn test thread");
let mut set = Select::new();
let idx = thread.select_add(&mut set);
let op = set.select_timeout(Duration::from_millis(30)).unwrap();
thread.select_join(op).unwrap();
assert_eq!(0, idx);
}
#[test]
fn select_multiple_threads() {
let thread1 = Builder::new("select_multiple_threads_1")
.spawn(|_| {
::std::thread::sleep(Duration::from_millis(50));
})
.expect("to spawn test thread");
let thread2 = Builder::new("select_multiple_threads_2")
.spawn(|_| {
::std::thread::sleep(Duration::from_millis(10));
})
.expect("to spawn test thread");
let mut set = Select::new();
thread1.select_add(&mut set);
thread2.select_add(&mut set);
let op = set.select_timeout(Duration::from_millis(30)).unwrap();
let idx = op.index();
thread2.select_join(op).unwrap();
assert_eq!(1, idx);
}
#[test]
fn select_panic() {
let thread = Builder::new("select_panic")
.spawn(|_| {
::std::thread::sleep(Duration::from_millis(10));
panic!("this panic is expected");
})
.expect("to spawn test thread");
let mut set = Select::new();
thread.select_add(&mut set);
let op = set.select_timeout(Duration::from_millis(30)).unwrap();
let idx = op.index();
let result = thread.select_join(op);
assert_eq!(0, idx);
assert_eq!(true, result.is_err());
}
#[test]
fn select_ready_interface() {
let thread = Builder::new("select_panic")
.spawn(|_| {
::std::thread::sleep(Duration::from_millis(10));
})
.expect("to spawn test thread");
let mut set = Select::new();
thread.select_add(&mut set);
let idx = set.ready_timeout(Duration::from_millis(30)).unwrap();
assert_eq!(0, idx);
thread.join_timeout(Duration::from_millis(10)).unwrap();
}
}