use crate::{Core, Fwd, Waker};
use std::collections::VecDeque;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Condvar, Mutex};
struct Queues<O: Send + Sync + 'static, I: Send + Sync + 'static> {
mutex: Mutex<QueuesInner<O, I>>,
condvar: Condvar,
}
struct QueuesInner<O: Send + Sync + 'static, I: Send + Sync + 'static> {
cancel: bool, panic: Option<String>, send: VecDeque<O>,
recv: Vec<I>,
}
pub struct PipedThread<O: Send + Sync + 'static, I: Send + Sync + 'static> {
queues: Arc<Queues<O, I>>,
}
impl<O: Send + Sync + 'static, I: Send + Sync + 'static> PipedThread<O, I> {
pub fn spawn(
fwd_recv: Fwd<I>,
fwd_term: Fwd<Option<String>>,
core: &mut Core,
run: impl FnOnce(&mut PipedLink<O, I>) + Send + 'static,
) -> Self {
let queues = Arc::new(Queues {
mutex: Mutex::new(QueuesInner {
cancel: false,
panic: None,
send: VecDeque::new(),
recv: Vec::new(),
}),
condvar: Condvar::new(),
});
let qu = queues.clone();
let waker = core.waker(move |_, deleted| {
let mut panic = None;
let mut lock = qu.mutex.lock().unwrap();
let recv = mem::take(&mut lock.recv);
if deleted {
panic = lock.panic.take();
}
drop(lock);
for msg in recv {
fwd_recv.fwd(msg);
}
if deleted {
fwd_term.fwd(panic);
}
});
let mut pipes = PipedLink {
queues: queues.clone(),
waker,
};
std::thread::spawn(move || {
if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| run(&mut pipes))) {
let msg = match e.downcast::<String>() {
Ok(v) => *v,
Err(e) => match e.downcast::<&str>() {
Ok(v) => v.to_string(),
Err(e) => format!("Panic with unknown type: {:?}", e.type_id()),
},
};
pipes.queues.mutex.lock().unwrap().panic = Some(msg);
}
});
Self { queues }
}
pub fn send(&mut self, msg: O) {
let mut lock = self.queues.mutex.lock().unwrap();
let empty = lock.send.is_empty();
lock.send.push_back(msg);
drop(lock);
if empty {
self.queues.condvar.notify_all();
}
}
}
impl<O: Send + Sync + 'static, I: Send + Sync + 'static> Drop for PipedThread<O, I> {
fn drop(&mut self) {
self.queues.mutex.lock().unwrap().cancel = true;
self.queues.condvar.notify_all();
}
}
pub struct PipedLink<O: Send + Sync + 'static, I: Send + Sync + 'static> {
queues: Arc<Queues<O, I>>,
waker: Waker,
}
impl<O: Send + Sync + 'static, I: Send + Sync + 'static> PipedLink<O, I> {
pub fn send(&mut self, msg: I) -> bool {
let mut lock = self.queues.mutex.lock().unwrap();
let cancel = lock.cancel;
let empty = lock.recv.is_empty();
lock.recv.push(msg);
drop(lock);
if empty {
self.waker.wake();
}
!cancel
}
pub fn recv(&mut self) -> Option<O> {
let mut lock = self.queues.mutex.lock().unwrap();
while !lock.cancel && lock.send.is_empty() {
lock = self.queues.condvar.wait(lock).unwrap();
}
if lock.cancel {
None
} else {
Some(lock.send.pop_front().unwrap())
}
}
pub fn cancel(&mut self) -> bool {
self.queues.mutex.lock().unwrap().cancel
}
}