use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use super::common::RawFunc;
use super::sync as fpSync;
use super::sync::Queue;
pub trait Handler: Send + Sync + 'static {
fn is_started(&mut self) -> bool;
fn is_alive(&mut self) -> bool;
fn start(&mut self);
fn stop(&mut self);
fn post(&mut self, func: RawFunc);
}
#[derive(Clone)]
pub struct HandlerThread {
started_alive: Arc<Mutex<(AtomicBool, AtomicBool)>>,
inner: Arc<HandlerThreadInner>,
handle: Arc<Mutex<Option<thread::JoinHandle<()>>>>,
}
impl Default for HandlerThread {
fn default() -> Self {
HandlerThread {
started_alive: Arc::new(Mutex::new((AtomicBool::new(false), AtomicBool::new(false)))),
inner: Arc::new(HandlerThreadInner::new()),
handle: Arc::new(Mutex::new(None)),
}
}
}
impl HandlerThread {
pub fn new() -> HandlerThread {
Default::default()
}
pub fn new_with_mutex() -> Arc<Mutex<HandlerThread>> {
Arc::new(Mutex::new(HandlerThread::new()))
}
}
impl Handler for HandlerThread {
fn is_started(&mut self) -> bool {
let started_alive = self.started_alive.lock().unwrap();
let &(ref started, _) = &*started_alive;
started.load(Ordering::SeqCst)
}
fn is_alive(&mut self) -> bool {
let started_alive = self.started_alive.lock().unwrap();
let &(_, ref alive) = &*started_alive;
alive.load(Ordering::SeqCst)
}
fn start(&mut self) {
{
let started_alive = self.started_alive.lock().unwrap();
let &(ref started, ref alive) = &*started_alive;
if started.load(Ordering::SeqCst) {
return;
}
started.store(true, Ordering::SeqCst);
if alive.load(Ordering::SeqCst) {
return;
}
alive.store(true, Ordering::SeqCst);
}
let mut _inner = self.inner.clone();
let mut this = self.clone();
self.handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
Arc::make_mut(&mut _inner).start();
this.stop();
}))));
}
fn stop(&mut self) {
{
let started_alive = self.started_alive.lock().unwrap();
let &(ref started, ref alive) = &*started_alive;
if !started.load(Ordering::SeqCst) {
return;
}
if !alive.load(Ordering::SeqCst) {
return;
}
alive.store(false, Ordering::SeqCst);
}
Arc::make_mut(&mut self.inner).stop();
}
fn post(&mut self, func: RawFunc) {
Arc::make_mut(&mut self.inner).post(func);
}
}
#[derive(Clone)]
struct HandlerThreadInner {
started: Arc<AtomicBool>,
alive: Arc<AtomicBool>,
q: Arc<fpSync::BlockingQueue<RawFunc>>,
}
impl HandlerThreadInner {
pub fn new() -> HandlerThreadInner {
HandlerThreadInner {
started: Arc::new(AtomicBool::new(false)),
alive: Arc::new(AtomicBool::new(false)),
q: Arc::new(<fpSync::BlockingQueue<RawFunc>>::new()),
}
}
}
impl Handler for HandlerThreadInner {
fn is_started(&mut self) -> bool {
self.started.load(Ordering::SeqCst)
}
fn is_alive(&mut self) -> bool {
self.alive.load(Ordering::SeqCst)
}
fn start(&mut self) {
self.alive.store(true, Ordering::SeqCst);
if self.is_started() {
return;
}
self.started.store(true, Ordering::SeqCst);
let q = Arc::make_mut(&mut self.q);
while self.alive.load(Ordering::SeqCst) {
let v = q.take();
match v {
Some(f) => {
f.invoke();
}
None => {
self.alive.store(false, Ordering::SeqCst);
}
}
}
}
fn stop(&mut self) {
self.alive.store(false, Ordering::SeqCst);
}
fn post(&mut self, func: RawFunc) {
let q = Arc::make_mut(&mut self.q);
q.put(func);
}
}
#[test]
fn test_handler_new() {
use super::sync::CountDownLatch;
use std::time;
let mut _h = HandlerThread::new_with_mutex();
let mut h = _h.lock().unwrap();
assert_eq!(false, h.is_alive());
assert_eq!(false, h.is_started());
h.stop();
h.stop();
assert_eq!(false, h.is_alive());
assert_eq!(false, h.is_started());
h.start();
assert_eq!(true, h.is_alive());
assert_eq!(true, h.is_started());
let latch = CountDownLatch::new(1);
let latch2 = latch.clone();
h.post(RawFunc::new(move || {
println!("Executed !");
let latch3 = latch2.clone();
let mut _h2 = HandlerThread::new_with_mutex();
let mut _h2_inside = _h2.clone();
let mut h2 = _h2.lock().unwrap();
h2.start();
h2.post(RawFunc::new(move || {
latch3.countdown();
{
_h2_inside.lock().unwrap().stop();
}
}));
}));
println!("Test");
thread::sleep(time::Duration::from_millis(1));
assert_eq!(true, h.is_alive());
assert_eq!(true, h.is_started());
h.stop();
thread::sleep(time::Duration::from_millis(1));
assert_eq!(false, h.is_alive());
assert_eq!(true, h.is_started());
latch.clone().wait();
}