1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
use crate::{Core, Fwd, Waker};
use std::collections::VecDeque;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Condvar, Mutex};
// TODO: Reallocate queues occasionally in case they grow huge?
// TODO: Fixed-size queue option to avoid allocations whilst locked?
// Uses a Mutex internally. We expect contention to be very low,
// since operations are quick and there are only two threads involved.
// So hopefully almost all mutex locks will be handled in userspace,
// not needing to go to the OS.
struct Queues<O: Send + Sync + 'static, I: Send + Sync + 'static> {
mutex: Mutex<QueuesInner<O, I>>,
condvar: Condvar,
}
struct QueuesInner<O: Send + Sync + 'static, I: Send + Sync + 'static> {
cancel: bool, // Set if PipedThread instance is dropped
panic: Option<String>, // Panic that occurred in the thread, or None
send: VecDeque<O>,
recv: Vec<I>,
}
/// A thread connected to the actor runtime via channels
///
/// This takes care of starting a thread and transferring data to and
/// from it via channels. Data sent to the thread has type `O`, and
/// data received has type `I`. These would often be enums to handle
/// different kinds of data (e.g. messages, commands or responses as
/// required).
///
/// This is useful for offloading synchronous or blocking work to
/// another thread. So the normal pattern of use would be for the
/// thread to block on [`PipedLink::recv`] until it gets something to
/// process. Processing could involve sending messages on other
/// channels or streams and waiting for replies, or running data in
/// parallel through a thread pool. Processing the received message
/// might or might not result in a message to send back with
/// [`PipedLink::send`]. Another use could be for blocking input,
/// where the thread waits on a device, and uses [`PipedLink::send`]
/// to pass back received data.
///
/// The only thing that this thread can't do is wait for both
/// [`PipedLink::recv`] and some other input at the same time. If you
/// need that, for now you'll need to write your own interface code to
/// `crossbeam` or some other channel library, using [`Waker`] to
/// interface back to **Stakker**.
///
/// Cleanup is handled as follows:
///
/// - If the thread terminates normally or panics, then the underlying
/// [`Waker`] notifies the main thread and `fwd_term` is called with
/// the panic error, or `None` if there was no panic. This handler
/// can discard the [`PipedThread`] instance to complete the cleanup,
/// and start a new thread if necessary.
///
/// - If the [`PipedThread`] instance is dropped in the main thread,
/// then a **cancel** flag is set which the thread will notice next
/// time it tries to send or receive data. The thread should then
/// terminate. So if the [`PipedThread`] instance is kept within the
/// same actor that is handling the incoming data, then this takes
/// care of thread cleanup automatically if the actor fails
/// unexpectedly.
///
/// [`PipedLink::recv`]: ../sync/struct.PipedLink.html#method.recv
/// [`PipedLink::send`]: ../sync/struct.PipedLink.html#method.send
/// [`PipedThread`]: ../sync/struct.PipedThread.html
/// [`Waker`]: ../sync/struct.Waker.html
pub struct PipedThread<O: Send + Sync + 'static, I: Send + Sync + 'static> {
queues: Arc<Queues<O, I>>,
}
impl<O: Send + Sync + 'static, I: Send + Sync + 'static> PipedThread<O, I> {
/// Spawn a new thread. `fwd_recv` will be called for each
/// incoming message. `fwd_term` will be called when the thread
/// terminates with the argument of `None` for normal termination,
/// or `Some(msg)` for a panic. The `run` argument is the closure
/// that will be run within the new thread. The [`PipedLink`]
/// argument passed to it allows the new thread to send and
/// receive messages.
///
/// Note: `core` argument is third argument so that `fwd_to!` and
/// similar macros can be used directly in the call arguments,
/// without borrow errors.
///
/// [`PipedLink`]: ../sync/struct.PipedLink.html
pub fn spawn(
fwd_recv: Fwd<I>,
fwd_term: Fwd<Option<String>>,
core: &mut Core,
run: impl FnOnce(&mut PipedLink<O, I>) + Send + 'static,
) -> Self {
let queues = Arc::new(Queues {
mutex: Mutex::new(QueuesInner {
cancel: false,
panic: None,
send: VecDeque::new(),
recv: Vec::new(),
}),
condvar: Condvar::new(),
});
let qu = queues.clone();
let waker = core.waker(move |_, deleted| {
let mut panic = None;
let mut lock = qu.mutex.lock().unwrap();
let recv = mem::take(&mut lock.recv);
if deleted {
panic = lock.panic.take();
}
drop(lock);
for msg in recv {
fwd_recv.fwd(msg);
}
if deleted {
fwd_term.fwd(panic);
}
});
let mut pipes = PipedLink {
queues: queues.clone(),
waker,
};
std::thread::spawn(move || {
if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| run(&mut pipes))) {
// Pass through panic message if it is a `String` or
// `&str`, else generate some debugging output
let msg = match e.downcast::<String>() {
Ok(v) => *v,
Err(e) => match e.downcast::<&str>() {
Ok(v) => v.to_string(),
Err(e) => format!("Panic with unknown type: {:?}", e.type_id()),
},
};
pipes.queues.mutex.lock().unwrap().panic = Some(msg);
}
// The Waker is dropped here, which will notify the main
// thread of termination
});
Self { queues }
}
/// Send a message to the thread. If the thread is blocked on
/// receive, wake it.
pub fn send(&mut self, msg: O) {
let mut lock = self.queues.mutex.lock().unwrap();
let empty = lock.send.is_empty();
lock.send.push_back(msg);
drop(lock);
if empty {
self.queues.condvar.notify_all();
}
}
}
impl<O: Send + Sync + 'static, I: Send + Sync + 'static> Drop for PipedThread<O, I> {
fn drop(&mut self) {
self.queues.mutex.lock().unwrap().cancel = true;
self.queues.condvar.notify_all();
}
}
/// Link from a [`PipedThread`] thread back to the main thread
///
/// [`PipedThread`]: ../sync/struct.PipedThread.html
pub struct PipedLink<O: Send + Sync + 'static, I: Send + Sync + 'static> {
queues: Arc<Queues<O, I>>,
waker: Waker,
}
impl<O: Send + Sync + 'static, I: Send + Sync + 'static> PipedLink<O, I> {
/// Send a message back to the main thread. Returns `true` on
/// success. If `false` is returned, then the [`PipedThread`] has
/// been dropped and this thread should terminate itself.
///
/// [`PipedThread`]: ../sync/struct.PipedThread.html
pub fn send(&mut self, msg: I) -> bool {
let mut lock = self.queues.mutex.lock().unwrap();
let cancel = lock.cancel;
let empty = lock.recv.is_empty();
lock.recv.push(msg);
drop(lock);
if empty {
self.waker.wake();
}
!cancel
}
/// Receive a message from the main thread. Blocks if there is no
/// message already waiting. Returns `None` if the
/// [`PipedThread`] has been dropped, in which case this thread
/// should terminate itself.
///
/// [`PipedThread`]: ../sync/struct.PipedThread.html
pub fn recv(&mut self) -> Option<O> {
let mut lock = self.queues.mutex.lock().unwrap();
while !lock.cancel && lock.send.is_empty() {
lock = self.queues.condvar.wait(lock).unwrap();
}
if lock.cancel {
None
} else {
Some(lock.send.pop_front().unwrap())
}
}
/// Check whether cancellation has been flagged by the main
/// thread. When the [`PipedThread`] is dropped, the cancel flag
/// is set to tell this thread to terminate. If the thread is
/// doing a long-running operation or blocking, it should check
/// the **cancel** flag from time to time to recognise this
/// condition and to clean up in good time.
///
/// [`PipedThread`]: ../sync/struct.PipedThread.html
pub fn cancel(&mut self) -> bool {
self.queues.mutex.lock().unwrap().cancel
}
}