message_io/util/
thread.rs

1use std::thread::{self, JoinHandle};
2
3/// A comprensive error message to notify that the error shown is from other thread.
4pub const OTHER_THREAD_ERR: &str = "Avoid this 'panicked_at' error. \
5                                   This error is shown because other thread has panicked \
6                                   You can safety skip this error.";
7
8/// Thread similar to the std, but with a name that can be nested.
9pub struct NamespacedThread<T: Send + 'static> {
10    namespace: String,
11    join_handle: Option<JoinHandle<T>>,
12}
13
14impl<T: Send + 'static> NamespacedThread<T> {
15    /// Similar to [`thread::spawn()`] but with a name.
16    pub fn spawn<F>(name: &str, f: F) -> Self
17    where
18        F: FnOnce() -> T,
19        F: Send + 'static,
20    {
21        let namespace = format!("{}/{}", thread::current().name().unwrap_or(""), name);
22        Self {
23            namespace: namespace.clone(),
24            join_handle: Some(
25                thread::Builder::new()
26                    .name(namespace.clone())
27                    .spawn(move || {
28                        log::trace!("Thread [{}] spawned", namespace);
29                        f()
30                    })
31                    .unwrap(),
32            ),
33        }
34    }
35
36    /// Wait the thread to finish.
37    pub fn join(&mut self) -> T {
38        log::trace!("Join thread [{}] ...", self.namespace);
39        let content = self.join_handle.take().unwrap().join().expect(OTHER_THREAD_ERR);
40        log::trace!("Joined thread [{}]", self.namespace);
41        content
42    }
43
44    /// Wait the thread to finish.
45    /// Returns the inner `T` value if never was joined, `None` otherwise
46    pub fn try_join(&mut self) -> Option<T> {
47        if self.join_handle.is_some() {
48            return Some(self.join());
49        }
50        None
51    }
52}
53
54impl<T: Send + 'static> Drop for NamespacedThread<T> {
55    fn drop(&mut self) {
56        self.try_join();
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use std::time::{Duration};
64    use std::sync::atomic::{AtomicBool, Ordering};
65    use std::sync::{Arc};
66
67    #[test]
68    fn basic_usage() {
69        let called = Arc::new(AtomicBool::new(false));
70        let mut thread = {
71            let called = called.clone();
72            NamespacedThread::spawn("test", move || {
73                std::thread::sleep(Duration::from_millis(500));
74                called.store(true, Ordering::Relaxed);
75            })
76        };
77
78        std::thread::sleep(Duration::from_millis(250));
79        assert!(!called.load(Ordering::Relaxed));
80        std::thread::sleep(Duration::from_millis(500));
81        assert!(called.load(Ordering::Relaxed));
82        thread.join();
83    }
84
85    #[test]
86    fn join_result() {
87        let called = Arc::new(AtomicBool::new(false));
88        let mut thread = {
89            let called = called.clone();
90            NamespacedThread::spawn("test", move || {
91                std::thread::sleep(Duration::from_millis(500));
92                called.store(true, Ordering::Relaxed);
93                "result"
94            })
95        };
96        assert_eq!("result", thread.join());
97        assert!(called.load(Ordering::Relaxed));
98    }
99
100    #[test]
101    fn drop_implies_join() {
102        let called = Arc::new(AtomicBool::new(false));
103        let thread = {
104            let called = called.clone();
105            NamespacedThread::spawn("test", move || {
106                std::thread::sleep(Duration::from_millis(500));
107                called.store(true, Ordering::Relaxed);
108            })
109        };
110        drop(thread);
111        assert!(called.load(Ordering::Relaxed));
112    }
113}