1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
use crate::{Core, Fwd, Waker};
use std::sync::{Arc, Mutex};
/// Channel for sending messages to an actor
///
/// A [`Channel`] may be used to send messages of type `M` to an actor
/// from any thread. It is an unbounded queue.
///
/// Messages are delivered directly to an actor method via a [`Fwd`]
/// instance. Cleanup of the channel is handled via a
/// [`ChannelGuard`] which should be kept in the same actor. When
/// this is dropped, the channel is closed, and senders are informed
/// via the [`Channel::send`] and [`Channel::is_closed`] methods. So
/// this handles cleanup automatically when the actor fails or
/// terminates for any reason.
///
/// [`Channel::is_closed`]: ../sync/struct.Channel.html#method.is_closed
/// [`Channel::send`]: ../sync/struct.Channel.html#method.send
/// [`ChannelGuard`]: ../sync/struct.ChannelGuard.html
/// [`Channel`]: ../sync/struct.Channel.html
/// [`Fwd`]: ../struct.Fwd.html
pub struct Channel<M: Send> {
arc: Arc<Mutex<ChannelBuf<M>>>,
}
struct ChannelBuf<M: Send> {
queue: Vec<M>,
waker: Option<Waker>, // None if closed
}
impl<M: Send> Channel<M> {
/// Create a new channel that directs messages to an actor using
/// the given `Fwd` instance. Returns the channel and a
/// channel-guard. The [`Channel`] may be cloned as many times as
/// necessary and sent to other threads. The [`ChannelGuard`]
/// should be kept in the actor that receives the messages.
///
/// [`ChannelGuard`]: ../sync/struct.ChannelGuard.html
/// [`Channel`]: ../sync/struct.Channel.html
pub fn new(core: &mut Core, fwd: Fwd<M>) -> (Self, ChannelGuard) {
let arc = Arc::new(Mutex::new(ChannelBuf {
queue: Vec::new(),
waker: None,
}));
let arc1 = arc.clone();
let waker = core.waker(move |_, _| {
let mut guard = arc1.lock().expect("Stakker channel lock poisoned");
let vec = std::mem::take(&mut guard.queue);
let is_open = guard.waker.is_some();
drop(guard);
if is_open {
for msg in vec {
fwd.fwd(msg);
}
}
});
arc.lock().unwrap().waker = Some(waker);
let this = Self { arc };
let guard = ChannelGuard(Box::new(this.clone()));
(this, guard)
}
/// Send a message to destination actor in the `Stakker` thread if
/// the channel is open, and return `true`. If the channel has
/// been closed, returns `false`.
pub fn send(&self, msg: M) -> bool {
let mut guard = self.arc.lock().expect("Stakker channel lock poisoned");
if let Some(ref waker) = guard.waker {
if guard.queue.is_empty() {
waker.wake();
}
guard.queue.push(msg);
true
} else {
false
}
}
/// Tests whether the channel has been closed.
pub fn is_closed(&self) -> bool {
let guard = self.arc.lock().expect("Stakker channel lock poisoned");
guard.waker.is_none()
}
}
impl<M: Send> Clone for Channel<M> {
/// Get another reference to the same channel
fn clone(&self) -> Self {
Self {
arc: self.arc.clone(),
}
}
}
trait Closable {
fn close(&self);
}
impl<M: 'static + Send> Closable for Channel<M> {
fn close(&self) {
let mut guard = self.arc.lock().expect("Stakker channel lock poisoned");
guard.waker.take();
guard.queue = Vec::new();
}
}
/// Guard for a channel
///
/// When this is dropped, the associated [`Channel`] is closed and any
/// pending messages are dropped. This should be kept in the actor
/// that receives messages so that any failure or other termination of
/// the actor results in correct cleanup.
///
/// [`Channel`]: ../sync/struct.Channel.html
pub struct ChannelGuard(Box<dyn Closable>);
impl Drop for ChannelGuard {
/// Close the channel and drop any pending messages
fn drop(&mut self) {
self.0.close();
}
}